From 0ee0f3f9a57d5aaf526b83b300f228f749f2decb Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Tue, 4 Feb 2025 13:28:49 +0000 Subject: [PATCH 01/10] [XrdCeph] Add initial support for calculating Adler32 on-the-fly --- src/XrdCeph/XrdCephOss.cc | 12 +++++-- src/XrdCeph/XrdCephPosix.cc | 66 +++++++++++++++++++++++++++++++++++-- src/XrdCeph/XrdCephPosix.hh | 4 +++ 3 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index 5ff411d33f4..8c869b1db95 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -164,6 +164,8 @@ XrdCephOss::~XrdCephOss() { // declared and used in XrdCephPosix.cc extern unsigned int g_maxCephPoolIdx; extern unsigned int g_cephAioWaitThresh; +extern bool g_useAdler32; +extern bool g_useCRC32; int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { int NoGo = 0; @@ -361,9 +363,15 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { m_configPoolnames = var; } else { Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn); - return 1; + return 1; } - } + } + if (!strcmp(var, "ceph.useadler32")) { + g_useAdler32 = true; + } // useadler32 + if (!strcmp(var, "ceph.usecrc32")) { + g_useCRC32 = true; + } // usecrc32 } // while // Now check if any errors occurred during file i/o diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 65ee444a114..a9205c5a0a5 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -54,7 +54,38 @@ #include "XrdCeph/XrdCephPosix.hh" #include "XrdCeph/XrdCephBulkAioRead.hh" #include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status +#include "XrdCks/XrdCksData.hh" +#include + +using namespace std; + +int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) { + + int rc = -1; + + std::vector attrData = XrdCksAttrData(cstype, ckSumbuf, time(0)); + + rc = ceph_posix_fsetxattr(fd, XrdCksAttrName(cstype).c_str(), + attrData.data(), attrData.size(), 0); + + return rc; +} + + +std::vector checksumData(const char* algName, const int algLen, const char* ckBuf) { + + XrdCksData xd; + xd.Set(algName); + xd.Set(ckBuf, algLen); + xd.fmTime = time(0); + xd.csTime = xd.fmTime; + + auto attrData = std::vector( (char *)&xd, ((char *)&xd)+sizeof(xd)); + + return attrData; + +} /// small struct for directory listing struct DirIterator { @@ -111,6 +142,10 @@ XrdSysMutex g_init_mutex; //JW Counter for number of times a given cluster is resolved. std::map g_idxCntr; +//IJJ: Falg whether to calculate Adler32 checksum +bool g_useAdler32; +bool g_useCRC32; + /// Accessor to next ceph pool index /// Note that this is not thread safe, but we do not care /// as we only want a rough load balancing @@ -749,6 +784,9 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } } // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it + if (g_useAdler32) { + fr.adler32 = adler32(0L, Z_NULL, 0); + } int fd = insertFileRef(fr); logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname); return fd; @@ -777,6 +815,20 @@ int ceph_posix_close(int fd) { fr->asyncWrCompletionCount, fr->asyncWrStartCount, fr->bytesAsyncWritePending, fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten, fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge)); + if (g_useAdler32) { + char ckBuf[8+1]; + + snprintf(ckBuf, 8+1, "%08lx", fr->adler32); + ckBuf[8] = '\0'; + logwrapper((char*)"ceph_close: Adler32 checksum = %s", ckBuf); + + int rc = setXrdCksAttr(fd, "adler32", (const char*)ckBuf); + + if (rc != 0) { + logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + } + + } deleteFileRef(fd, *fr); return 0; } else { @@ -838,6 +890,9 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { fr->wrcount++; fr->bytesWritten+=count; if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten); + if (g_useAdler32) { + fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + } return count; } else { return -EBADF; @@ -848,7 +903,8 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) CephFileRef* fr = getFileRef(fd); if (fr) { // TODO implement proper logging level for this plugin - this should be only debug - //logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count); + //logwrapper((char*)"ceph_posix_pwrite: for fd %d, count=%d", fd, count); + if ((fr->flags & O_ACCMODE) == O_RDONLY) { return -EBADF; } @@ -864,6 +920,9 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) fr->wrcount++; fr->bytesWritten+=count; if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten); + if (g_useAdler32) { + fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + } return count; } else { return -EBADF; @@ -908,7 +967,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { const char *buf = (const char*)aiop->sfsAio.aio_buf; size_t offset = aiop->sfsAio.aio_offset; // TODO implement proper logging level for this plugin - this should be only debug - //logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); + logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); if ((fr->flags & O_ACCMODE) == O_RDONLY) { return -EBADF; } @@ -938,6 +997,9 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { fr->asyncWrStartCount++; ::gettimeofday(&fr->lastAsyncSubmission, nullptr); fr->bytesAsyncWritePending+=count; + if (g_useAdler32) { + fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + } return rc; } else { return -EBADF; diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index 5d01129b6ae..8e0c7df35c8 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -39,6 +39,7 @@ #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucIOVec.hh" +#include // simple logging for XrdCeph buffering code #define XRDCEPHLOGLEVEL 1 @@ -107,6 +108,7 @@ struct CephFile { unsigned int nbStripes; unsigned long long stripeUnit; unsigned long long objectSize; + }; struct CephFileRef : CephFile { @@ -127,6 +129,8 @@ struct CephFileRef : CephFile { ::timeval lastAsyncSubmission; double longestAsyncWriteTime; double longestCallbackInvocation; + uLong adler32; + uLong crc32; }; #endif // __XRD_CEPH_POSIX__ From d8cfcf4fa660ad2d98caed9dbe8a4b6a922d9e88 Mon Sep 17 00:00:00 2001 From: Jo-stfc <71326101+Jo-stfc@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:34:19 +0000 Subject: [PATCH 02/10] [XrdCeph] add useratpool in server logs in case of failure --- src/XrdCeph/XrdCephPosix.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index a9205c5a0a5..1047b99e0b6 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -564,6 +564,7 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con } int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx); if (rc != 0) { + logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, user@pool = %s", userAtPool); logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc); cluster->shutdown(); delete cluster; From 53b9ddaa8ce493d84042602d59fa5a30cf894dff Mon Sep 17 00:00:00 2001 From: Jyothish Thomas Date: Mon, 16 Dec 2024 11:58:22 +0000 Subject: [PATCH 03/10] [XrdCeph] clear checksum on write completion --- src/XrdCeph/XrdCephPosix.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 1047b99e0b6..07f1b2024d2 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -564,7 +564,7 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con } int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx); if (rc != 0) { - logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, user@pool = %s", userAtPool); + logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, user@pool = %s", userAtPool.c_str()); logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc); cluster->shutdown(); delete cluster; @@ -808,6 +808,9 @@ int ceph_posix_close(int fd) { lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec) + 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec); } + if (fr->bytesWritten > 0){ + ceph_posix_fremovexattr(fd,"XrdCks.adler32"); + } logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, " "async write ops %d/%d, async pending write bytes %ld, " "async read ops %d/%d, bytes written/max offset %ld/%ld, " From 52dee3965ceb6a369511c0834d99ae0ab6b14dfe Mon Sep 17 00:00:00 2001 From: Alexander Rogovskiy Date: Wed, 19 Feb 2025 10:14:32 +0000 Subject: [PATCH 04/10] [XrdCeph] feat: allow buffering layer to work only for writes It was noticed that for reads there may be some inefficiencies when xcache is involved. For writes, however, the buffering layer improved pefromance significantly. So, it make sense to introduce the write-only mode. --- src/XrdCeph/XrdCephOss.cc | 2 +- src/XrdCeph/XrdCephOssBufferedFile.cc | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index 8c869b1db95..2790d4957fd 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -350,7 +350,7 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) { Eroute.Emsg("Config", "readvalgname parameters will be ignored"); } - m_configBufferIOmode = var; // allowed values would be aio, io + m_configBufferIOmode = var; // allowed values would be aio, io, write-only-io } else { Eroute.Emsg("Config", "Missing value for ceph.bufferiomode in config file", configfn); return 1; diff --git a/src/XrdCeph/XrdCephOssBufferedFile.cc b/src/XrdCeph/XrdCephOssBufferedFile.cc index 0862d39b4d0..63169cd99b9 100644 --- a/src/XrdCeph/XrdCephOssBufferedFile.cc +++ b/src/XrdCeph/XrdCephOssBufferedFile.cc @@ -151,6 +151,10 @@ ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) { ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) { size_t thread_id = std::hash{}(std::this_thread::get_id()); + if (m_bufferIOmode == "write-only-io") { + return m_xrdOssDF->Read(buff, offset, blen); + } + IXrdCephBufferAlg * buffer{nullptr}; // check for, and create if needed, a buffer { @@ -326,11 +330,11 @@ std::unique_ptr XrdCephOssBufferedFile::create std::unique_ptr cephio; if (m_bufferIOmode == "aio") { cephio = std::unique_ptr(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd)); - } else if (m_bufferIOmode == "io") { + } else if (m_bufferIOmode == "io" || m_bufferIOmode == "write-only-io") { cephio = std::unique_ptr(new CephIOAdapterRaw(cephbuffer.get(),m_fd, !m_cephoss->m_useDefaultPreadAlg)); } else { - BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " ); + BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io|write-only-io " ); m_xrdOssDF->Close(); return bufferAlg; // invalid instance; } From 2482227ba1ccd6af685951abc7f8fd3ab82a116a Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Fri, 28 Feb 2025 09:39:38 +0000 Subject: [PATCH 05/10] [XrdCeph] Change config options. The default is to not calculate the streamed checksum. Settings for streamed checksum allow calculation only, calc+log, and calc+log+store as extended attribute. Record streamed and readback checksums in a central checksums.log file. --- src/XrdCeph/XrdCephOss.cc | 89 +++++++++++++++++++++++++++++++++---- src/XrdCeph/XrdCephPosix.cc | 84 +++++++++++++++++++++++++++------- src/XrdCeph/XrdCephXAttr.cc | 68 ++++++++++++++++++++++++++-- 3 files changed, 214 insertions(+), 27 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index 2790d4957fd..f5d7223c1c0 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -128,6 +128,19 @@ ssize_t getNumericAttr(const char* const path, const char* attrName, const int m } +extern FILE *g_cksLogFile; + + +char *ts_rfc3339() { + + std::time_t now = std::time({}); + char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; + std::strftime(std::data(timeString), std::size(timeString), + "%F %TZ", std::gmtime(&now)); + return strdup(timeString); +} + + extern "C" { XrdOss* @@ -164,8 +177,12 @@ XrdCephOss::~XrdCephOss() { // declared and used in XrdCephPosix.cc extern unsigned int g_maxCephPoolIdx; extern unsigned int g_cephAioWaitThresh; -extern bool g_useAdler32; -extern bool g_useCRC32; + +extern bool g_calcStreamedAdler32; +extern bool g_storeStreamedAdler32; +extern bool g_logStreamedAdler32; + + int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { int NoGo = 0; @@ -366,12 +383,67 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { return 1; } } - if (!strcmp(var, "ceph.useadler32")) { - g_useAdler32 = true; - } // useadler32 - if (!strcmp(var, "ceph.usecrc32")) { - g_useCRC32 = true; - } // usecrc32 + if (!strcmp(var, "ceph.streamed-cks-adler32")) { // Streaming Adler32 checksum + + + char hostname[32]; + (void)gethostname(hostname, 31); + Eroute.Emsg("Hostname is ", hostname); + var = Config.GetWord(); + if (var) { + +/* + * Currently, actions are simply additive: + * + * Store implies calculate, log, store + * Log implies calculate, log + * Calc implies calculate + * + * Might want to make e.g. logging optional in the future, + * when storing is more prevalent. + * + * Instead of setting g_* flags in three conditionals, + * can switch to setting values in a single bitfield flag + * + */ + if (strstr(var, "calc")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = false; + g_storeStreamedAdler32 = false; + } + if (strstr(var, "log")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = true; + g_storeStreamedAdler32 = false; + } + if (strstr(var, "store")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = true; + g_storeStreamedAdler32 = true; + } + + if (g_logStreamedAdler32) { + const char *cksLogFilename = "/var/log/xrootd/checksums/checksums.log"; + g_cksLogFile = fopen(cksLogFilename, "a"); + if (NULL == g_cksLogFile) { + g_logStreamedAdler32 = false; + Eroute.Emsg("Config cannot open file for logging checksum values and pathnames: ", cksLogFilename); + } else { + Eroute.Emsg("Opened file for logging checksum values and pathname: ", cksLogFilename); + } + } + } + }// "ceph.streamed-cks-adler32" + +/* + if (!strcmp(var, "ceph.streamed-cks-adler32-log")) { + var = Config.GetWord(); + // Test path for being writable after exiting parse loop... + // + g_logStreamedAdler32StreamedPath = strdup(var); + } +*/ + } // while // Now check if any errors occurred during file i/o @@ -382,6 +454,7 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { configfn); } Config.Close(); + } return NoGo; } diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 07f1b2024d2..0a6feade611 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -58,6 +58,19 @@ #include +//#include "XrdCeph/XrdCephGlobals.hh" + +char *ts_rfc3339() { + + std::time_t now = std::time({}); + char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; + std::strftime(std::data(timeString), std::size(timeString), + "%F %TZ", std::gmtime(&now)); + return strdup(timeString); +} + + + using namespace std; int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) { @@ -142,9 +155,18 @@ XrdSysMutex g_init_mutex; //JW Counter for number of times a given cluster is resolved. std::map g_idxCntr; -//IJJ: Falg whether to calculate Adler32 checksum -bool g_useAdler32; -bool g_useCRC32; +//IJJ: Actions for Adler32 checksum + +extern bool g_calcStreamedAdler32; +extern bool g_logStreamedAdler32; +extern bool g_storeStreamedAdler32; +======= +bool g_calcStreamedAdler32; +bool g_logStreamedAdler32; +bool g_storeStreamedAdler32; +>>>>>>> 0aed50f31 ([XrdCeph] swap extern use to pass compilation) + +FILE *g_cksLogFile; /// Accessor to next ceph pool index /// Note that this is not thread safe, but we do not care @@ -785,7 +807,7 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } } // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it - if (g_useAdler32) { + if (g_calcStreamedAdler32) { fr.adler32 = adler32(0L, Z_NULL, 0); } int fd = insertFileRef(fr); @@ -796,6 +818,20 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } + +const char* formatAdler32(unsigned long adler32) { + +#define ADLER32_LENGTH 8 + + char ckBuf[ADLER32_LENGTH+1]; + snprintf(ckBuf, ADLER32_LENGTH+1, "%0*lx", ADLER32_LENGTH, adler32); + ckBuf[ADLER32_LENGTH] = '\0'; + + return (const char*)strdup(ckBuf); + +} + + int ceph_posix_close(int fd) { CephFileRef* fr = getFileRef(fd); if (fr) { @@ -819,20 +855,31 @@ int ceph_posix_close(int fd) { fr->asyncWrCompletionCount, fr->asyncWrStartCount, fr->bytesAsyncWritePending, fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten, fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge)); - if (g_useAdler32) { - char ckBuf[8+1]; - snprintf(ckBuf, 8+1, "%08lx", fr->adler32); - ckBuf[8] = '\0'; - logwrapper((char*)"ceph_close: Adler32 checksum = %s", ckBuf); + if (g_calcStreamedAdler32) { + const char* adler32str = formatAdler32(fr->adler32); + logwrapper((char*)"ceph_close: Adler32 streamed checksum = %s", adler32str); - int rc = setXrdCksAttr(fd, "adler32", (const char*)ckBuf); + if (g_logStreamedAdler32) { + const char *timestamp = ts_rfc3339(); // "2025-02-24 13:09:01+00:00" + const char *path = (fr->pool + ":" + fr->name).c_str(); + const char *cksMode = "streamed"; + const char *cksType = "adler32"; + const char *cksValue = adler32str; - if (rc != 0) { - logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", timestamp, path, cksMode, cksType, cksValue); + fflush(g_cksLogFile); } + if (g_storeStreamedAdler32) { + int rc = setXrdCksAttr(fd, "adler32", adler32str); + + if (rc != 0) { + logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + } + } } + deleteFileRef(fd, *fr); return 0; } else { @@ -894,9 +941,12 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { fr->wrcount++; fr->bytesWritten+=count; if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten); - if (g_useAdler32) { + if (g_calcStreamedAdler32) { fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + } else { + logwrapper((char*)"ceph_write: g_calcAdler is false!"); } + return count; } else { return -EBADF; @@ -919,14 +969,17 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) ceph::bufferlist bl; bl.append((const char*)buf, count); int rc = striper->write(fr->name, bl, count, offset); + if (rc) return rc; XrdSysMutexHelper lock(fr->statsMutex); fr->wrcount++; fr->bytesWritten+=count; if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten); - if (g_useAdler32) { + + if (g_calcStreamedAdler32) { fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); } + return count; } else { return -EBADF; @@ -1001,7 +1054,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { fr->asyncWrStartCount++; ::gettimeofday(&fr->lastAsyncSubmission, nullptr); fr->bytesAsyncWritePending+=count; - if (g_useAdler32) { + if (g_calcStreamedAdler32) { fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); } return rc; @@ -1609,7 +1662,6 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { logwrapper((char*)"ceph_posix_unlink : %s", pathname); // start the timer auto timer_start = std::chrono::steady_clock::now(); - // minimal stat : only size and times are filled CephFile file = getCephFile(pathname, env); libradosstriper::RadosStriper *striper = getRadosStriper(file); diff --git a/src/XrdCeph/XrdCephXAttr.cc b/src/XrdCeph/XrdCephXAttr.cc index 4c2f60a0f96..3218ffa2254 100644 --- a/src/XrdCeph/XrdCephXAttr.cc +++ b/src/XrdCeph/XrdCephXAttr.cc @@ -28,6 +28,23 @@ #include "XrdOuc/XrdOucTrace.hh" #include "XrdCeph/XrdCephXAttr.hh" +#include "XrdCks/XrdCksData.hh" +#include + +//#include "XrdCeph/XrdCephGlobals.hh" + +FILE *g_cksLogFile; + +char *ts_rfc3339() { + + std::time_t now = std::time({}); + char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; + std::strftime(std::data(timeString), std::size(timeString), + "%F %TZ", std::gmtime(&now)); + return strdup(timeString); +} + + XrdSysError XrdCephXattrEroute(0); XrdOucTrace XrdCephXattrTrace(&XrdCephXattrEroute); @@ -49,9 +66,35 @@ extern "C" XrdCephXattrEroute.Say("CephXattr loading failed with exception. Check the syntax of parameters : ", parms); return 0; } + + const char *cksLogFilename = "/var/log/xrootd/checksums/checksums.log"; + g_cksLogFile = fopen(cksLogFilename, "a"); + if (NULL == g_cksLogFile) { + XrdCephXattrEroute.Emsg("Config cannot open file for logging checksum values and pathnames: ", cksLogFilename); + } else { + XrdCephXattrEroute.Emsg("Opened file for logging checksum values and pathname: ", cksLogFilename); + } return new XrdCephXAttr(); } } +extern bool g_storeStreamedAdler32; + +constexpr char hex2ascii(char nibble) { return (0<= nibble && nibble<=9) ? nibble+'0' : nibble-10+'a'; } +constexpr char hiNibble(uint8_t hexbyte) { return (hexbyte & 0xf0) >> 4; } +constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); } + +constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){ + + char asciiVal[2*length+1] {}; + for (unsigned int i = 0, j = 0; i < length; i++) { + + const uint8_t hexbyte = bytes[i]; + asciiVal[j++] = hex2ascii(hiNibble(hexbyte)); + asciiVal[j++] = hex2ascii(loNibble(hexbyte)); + + } + return strdup(asciiVal); +} XrdCephXAttr::XrdCephXAttr() {} @@ -99,16 +142,35 @@ int XrdCephXAttr::List(AList **aPL, const char *Path, int fd, int getSz) { int XrdCephXAttr::Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd, int isNew) { + + int rc = 0; + if (fd >= 0) { - return ceph_posix_fsetxattr(fd, Aname, Aval, Avsz, 0); + rc = ceph_posix_fsetxattr(fd, Aname, Aval, Avsz, 0); } else { try { - return ceph_posix_setxattr(0, Path, Aname, Aval, Avsz, 0); + rc = ceph_posix_setxattr(0, Path, Aname, Aval, Avsz, 0); } catch (std::exception &e) { XrdCephXattrEroute.Say("Set : invalid syntax in file parameters", Path); - return -EINVAL; + rc = -EINVAL; } } + + if (0 == rc && !strcmp(Aname, "XrdCks.adler32") ) { + + + auto *cks = (XrdCksData *)Aval; + auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length); + XrdCephXattrEroute.Say("readback checksum = ", cksAscii); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), Path, "readback", "adler32", cksAscii); + fflush(g_cksLogFile); +/* + char cksBuff[8+1];// sprintf(cksBuff, "%08x", (uint8_t)*(cks->Value));// Produces "00000084" + fflush(g_cksLogFile); +*/ + } + + return rc; } XrdVERSIONINFO(XrdSysGetXAttrObject, XrdCephXAttr); From 8b942f29f6197fefbfce9c34b63fd4abbdb6da99 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Thu, 13 Mar 2025 11:37:05 +0000 Subject: [PATCH 06/10] [XrdCeph] Open checksum record file once, in XrdCephPosix.cc . Do not log uninitialised checksum value when client reads file. Move recording of readback checksum value into XrdCephPosix instead of Set() method in XrdCephXAttr.cc. More checks for writable checksum record file --- src/XrdCeph/XrdCephOss.cc | 48 +++++++++--------- src/XrdCeph/XrdCephPosix.cc | 99 ++++++++++++++++++++----------------- src/XrdCeph/XrdCephPosix.hh | 1 + src/XrdCeph/XrdCephXAttr.cc | 57 +-------------------- 4 files changed, 81 insertions(+), 124 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index f5d7223c1c0..fd2650d4fca 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -127,6 +127,7 @@ ssize_t getNumericAttr(const char* const path, const char* attrName, const int m return retval; } +char *g_cksLogFileName; extern FILE *g_cksLogFile; @@ -385,13 +386,8 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { } if (!strcmp(var, "ceph.streamed-cks-adler32")) { // Streaming Adler32 checksum - - char hostname[32]; - (void)gethostname(hostname, 31); - Eroute.Emsg("Hostname is ", hostname); - var = Config.GetWord(); + var = Config.GetWord(); if (var) { - /* * Currently, actions are simply additive: * @@ -422,27 +418,21 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { g_storeStreamedAdler32 = true; } - if (g_logStreamedAdler32) { - const char *cksLogFilename = "/var/log/xrootd/checksums/checksums.log"; - g_cksLogFile = fopen(cksLogFilename, "a"); - if (NULL == g_cksLogFile) { - g_logStreamedAdler32 = false; - Eroute.Emsg("Config cannot open file for logging checksum values and pathnames: ", cksLogFilename); - } else { - Eroute.Emsg("Opened file for logging checksum values and pathname: ", cksLogFilename); - } - } } }// "ceph.streamed-cks-adler32" -/* - if (!strcmp(var, "ceph.streamed-cks-adler32-log")) { - var = Config.GetWord(); - // Test path for being writable after exiting parse loop... - // - g_logStreamedAdler32StreamedPath = strdup(var); - } -*/ + if (!strcmp(var, "ceph.streamed-cks-logfile") ) { + var = Config.GetWord(); + if (var) { + g_cksLogFileName = strdup(var); + } else { + const char *defLogFileName = "/tmp/checksums.log"; // To-DO: Move defLogFileName so it can also be used as fallback + // when attempt to open specified log file below fails + Eroute.Emsg("Config", "Missing value for ceph.streamed-cks-logfile in config file, setting to default = ", defLogFileName); + g_cksLogFileName = strdup(defLogFileName); + return 1; + } + }// "ceph.streamed-cks-logfile" } // while @@ -455,6 +445,16 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { } Config.Close(); + if (g_logStreamedAdler32) { + if (NULL == (g_cksLogFile = fopen(g_cksLogFileName, "a"))) { + g_logStreamedAdler32 = false; + Eroute.Emsg("Config: ", "cannot open file for logging checksum values and pathname", g_cksLogFileName); + return 1; + } else { + Eroute.Emsg("Config: ", "Opened file for logging checksum values and pathname: ", g_cksLogFileName); + } + } + } return NoGo; } diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 0a6feade611..c80a6d1b379 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -58,8 +58,6 @@ #include -//#include "XrdCeph/XrdCephGlobals.hh" - char *ts_rfc3339() { std::time_t now = std::time({}); @@ -69,7 +67,22 @@ char *ts_rfc3339() { return strdup(timeString); } +constexpr char hex2ascii(char nibble) { return (0<= nibble && nibble<=9) ? nibble+'0' : nibble-10+'a'; } +constexpr char hiNibble(uint8_t hexbyte) { return (hexbyte & 0xf0) >> 4; } +constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); } + +constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){ + char asciiVal[2*length+1] {}; + for (unsigned int i = 0, j = 0; i < length; i++) { + + const uint8_t hexbyte = bytes[i]; + asciiVal[j++] = hex2ascii(hiNibble(hexbyte)); + asciiVal[j++] = hex2ascii(loNibble(hexbyte)); + + } + return strdup(asciiVal); +} using namespace std; @@ -85,21 +98,6 @@ int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) { return rc; } - -std::vector checksumData(const char* algName, const int algLen, const char* ckBuf) { - - XrdCksData xd; - xd.Set(algName); - xd.Set(ckBuf, algLen); - xd.fmTime = time(0); - xd.csTime = xd.fmTime; - - auto attrData = std::vector( (char *)&xd, ((char *)&xd)+sizeof(xd)); - - return attrData; - -} - /// small struct for directory listing struct DirIterator { librados::NObjectIterator m_iterator; @@ -158,13 +156,11 @@ std::map g_idxCntr; //IJJ: Actions for Adler32 checksum extern bool g_calcStreamedAdler32; +bool g_calcStreamedAdler32 = false; extern bool g_logStreamedAdler32; -extern bool g_storeStreamedAdler32; -======= -bool g_calcStreamedAdler32; -bool g_logStreamedAdler32; -bool g_storeStreamedAdler32; ->>>>>>> 0aed50f31 ([XrdCeph] swap extern use to pass compilation) +bool g_logStreamedAdler32 = false; +extern bool g_storeStreamedAdler32; +bool g_storeStreamedAdler32 = false; FILE *g_cksLogFile; @@ -806,10 +802,12 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } } } + // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it if (g_calcStreamedAdler32) { fr.adler32 = adler32(0L, Z_NULL, 0); } + fr.writingData = true; int fd = insertFileRef(fr); logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname); return fd; @@ -856,26 +854,29 @@ int ceph_posix_close(int fd) { fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten, fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge)); - if (g_calcStreamedAdler32) { + if (fr->writingData) { + if (g_calcStreamedAdler32) { const char* adler32str = formatAdler32(fr->adler32); - logwrapper((char*)"ceph_close: Adler32 streamed checksum = %s", adler32str); + logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32str); - if (g_logStreamedAdler32) { - const char *timestamp = ts_rfc3339(); // "2025-02-24 13:09:01+00:00" - const char *path = (fr->pool + ":" + fr->name).c_str(); - const char *cksMode = "streamed"; - const char *cksType = "adler32"; - const char *cksValue = adler32str; + if (g_logStreamedAdler32) { - fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", timestamp, path, cksMode, cksType, cksValue); - fflush(g_cksLogFile); - } + const char *timestamp = ts_rfc3339(); // "2025-02-24 13:09:01+00:00" + const char *path = (fr->pool + ":" + fr->name).c_str(); + const char *cksMode = "streamed"; + const char *cksType = "adler32"; + const char *cksValue = adler32str; + + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", timestamp, path, cksMode, cksType, cksValue); + fflush(g_cksLogFile); + } - if (g_storeStreamedAdler32) { - int rc = setXrdCksAttr(fd, "adler32", adler32str); + if (g_storeStreamedAdler32) { + int rc = setXrdCksAttr(fd, "adler32", adler32str); - if (rc != 0) { - logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + if (rc != 0) { + logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + } } } } @@ -941,12 +942,9 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { fr->wrcount++; fr->bytesWritten+=count; if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten); - if (g_calcStreamedAdler32) { + if (fr->writingData && g_calcStreamedAdler32) { fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); - } else { - logwrapper((char*)"ceph_write: g_calcAdler is false!"); } - return count; } else { return -EBADF; @@ -976,7 +974,7 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) fr->bytesWritten+=count; if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten); - if (g_calcStreamedAdler32) { + if (fr->writingData && g_calcStreamedAdler32) { fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); } @@ -1468,8 +1466,21 @@ static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* na ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path, const char* name, const void* value, size_t size, int flags) { + int rc; + + auto *cks = (XrdCksData*)value; logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value); - return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags); + rc = ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags); + + if (0 == rc && !strcmp(name, "XrdCks.adler32") && g_logStreamedAdler32) { + + auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length); + logwrapper((char*)"readback checksum = %s", cksAscii); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "readback", "adler32", cksAscii); + fflush(g_cksLogFile); + + } + return rc; } int ceph_posix_fsetxattr(int fd, diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index 8e0c7df35c8..8ceb92af352 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -131,6 +131,7 @@ struct CephFileRef : CephFile { double longestCallbackInvocation; uLong adler32; uLong crc32; + bool writingData; }; #endif // __XRD_CEPH_POSIX__ diff --git a/src/XrdCeph/XrdCephXAttr.cc b/src/XrdCeph/XrdCephXAttr.cc index 3218ffa2254..b56a6fc5b03 100644 --- a/src/XrdCeph/XrdCephXAttr.cc +++ b/src/XrdCeph/XrdCephXAttr.cc @@ -31,20 +31,6 @@ #include "XrdCks/XrdCksData.hh" #include -//#include "XrdCeph/XrdCephGlobals.hh" - -FILE *g_cksLogFile; - -char *ts_rfc3339() { - - std::time_t now = std::time({}); - char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; - std::strftime(std::data(timeString), std::size(timeString), - "%F %TZ", std::gmtime(&now)); - return strdup(timeString); -} - - XrdSysError XrdCephXattrEroute(0); XrdOucTrace XrdCephXattrTrace(&XrdCephXattrEroute); @@ -67,34 +53,9 @@ extern "C" return 0; } - const char *cksLogFilename = "/var/log/xrootd/checksums/checksums.log"; - g_cksLogFile = fopen(cksLogFilename, "a"); - if (NULL == g_cksLogFile) { - XrdCephXattrEroute.Emsg("Config cannot open file for logging checksum values and pathnames: ", cksLogFilename); - } else { - XrdCephXattrEroute.Emsg("Opened file for logging checksum values and pathname: ", cksLogFilename); - } return new XrdCephXAttr(); } } -extern bool g_storeStreamedAdler32; - -constexpr char hex2ascii(char nibble) { return (0<= nibble && nibble<=9) ? nibble+'0' : nibble-10+'a'; } -constexpr char hiNibble(uint8_t hexbyte) { return (hexbyte & 0xf0) >> 4; } -constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); } - -constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){ - - char asciiVal[2*length+1] {}; - for (unsigned int i = 0, j = 0; i < length; i++) { - - const uint8_t hexbyte = bytes[i]; - asciiVal[j++] = hex2ascii(hiNibble(hexbyte)); - asciiVal[j++] = hex2ascii(loNibble(hexbyte)); - - } - return strdup(asciiVal); -} XrdCephXAttr::XrdCephXAttr() {} @@ -142,7 +103,6 @@ int XrdCephXAttr::List(AList **aPL, const char *Path, int fd, int getSz) { int XrdCephXAttr::Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd, int isNew) { - int rc = 0; if (fd >= 0) { @@ -152,24 +112,9 @@ int XrdCephXAttr::Set(const char *Aname, const void *Aval, int Avsz, rc = ceph_posix_setxattr(0, Path, Aname, Aval, Avsz, 0); } catch (std::exception &e) { XrdCephXattrEroute.Say("Set : invalid syntax in file parameters", Path); - rc = -EINVAL; + rc = -EINVAL; } } - - if (0 == rc && !strcmp(Aname, "XrdCks.adler32") ) { - - - auto *cks = (XrdCksData *)Aval; - auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length); - XrdCephXattrEroute.Say("readback checksum = ", cksAscii); - fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), Path, "readback", "adler32", cksAscii); - fflush(g_cksLogFile); -/* - char cksBuff[8+1];// sprintf(cksBuff, "%08x", (uint8_t)*(cks->Value));// Produces "00000084" - fflush(g_cksLogFile); -*/ - } - return rc; } From 2533629489b0f8d28e409c20221d30bd4fa0ee9f Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Sun, 16 Mar 2025 20:08:50 +0000 Subject: [PATCH 07/10] [XrdCeph]Use strdup to ensure that concatenation of C String .c_str() works correctly --- src/XrdCeph/XrdCephPosix.cc | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index c80a6d1b379..744fb617bd8 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -73,7 +73,7 @@ constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); } constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){ - char asciiVal[2*length+1] {}; + char asciiVal[9] {}; for (unsigned int i = 0, j = 0; i < length; i++) { const uint8_t hexbyte = bytes[i]; @@ -860,14 +860,8 @@ int ceph_posix_close(int fd) { logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32str); if (g_logStreamedAdler32) { - - const char *timestamp = ts_rfc3339(); // "2025-02-24 13:09:01+00:00" - const char *path = (fr->pool + ":" + fr->name).c_str(); - const char *cksMode = "streamed"; - const char *cksType = "adler32"; - const char *cksValue = adler32str; - - fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", timestamp, path, cksMode, cksType, cksValue); + const char *path = strdup((fr->pool + ":" + fr->name).c_str()); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "streamed", "adler32", adler32str); fflush(g_cksLogFile); } From 4f39098bac4c52200a801b0efd8e480415a58c56 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Mon, 17 Mar 2025 11:16:38 +0000 Subject: [PATCH 08/10] [XrdCeph] Remove duplicate definition of ts_rfc3339 in XrdCephOss --- src/XrdCeph/XrdCephOss.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index fd2650d4fca..c6e043c0bc1 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -131,17 +131,6 @@ char *g_cksLogFileName; extern FILE *g_cksLogFile; - -char *ts_rfc3339() { - - std::time_t now = std::time({}); - char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; - std::strftime(std::data(timeString), std::size(timeString), - "%F %TZ", std::gmtime(&now)); - return strdup(timeString); -} - - extern "C" { XrdOss* From c61419b0d751e181591917f190d985574e9078dd Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Wed, 26 Mar 2025 11:25:46 +0000 Subject: [PATCH 09/10] [XrdCeph] Move to use Use XRootD's own checksum calculator class rather than using zlib directly Move from using zlib adler32() function to XrdCks/XrdCksCalcadler32. The superclass, XrdCksCalc, provides a uniform interface to all supported checksum algorithms, and is the preferred method in XRootD. Remove an unneccessary guard condition in the ceph_posix_*write() methods. --- src/XrdCeph/XrdCephPosix.cc | 45 ++++++++++++++++++++----------------- src/XrdCeph/XrdCephPosix.hh | 5 ++--- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 744fb617bd8..c867d196b9a 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -805,7 +805,8 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it if (g_calcStreamedAdler32) { - fr.adler32 = adler32(0L, Z_NULL, 0); + fr.cksCalcadler32 = new XrdCksCalcadler32(); + fr.cksCalcadler32->Init(); } fr.writingData = true; int fd = insertFileRef(fr); @@ -819,17 +820,14 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode const char* formatAdler32(unsigned long adler32) { -#define ADLER32_LENGTH 8 - - char ckBuf[ADLER32_LENGTH+1]; - snprintf(ckBuf, ADLER32_LENGTH+1, "%0*lx", ADLER32_LENGTH, adler32); - ckBuf[ADLER32_LENGTH] = '\0'; - - return (const char*)strdup(ckBuf); - +#ifndef Xrd_Big_Endian + adler32 = htonl(adler32); +#endif + char adler32Cks[8+1]; + sprintf(adler32Cks, "%08lx", adler32); + return (const char*)strdup(adler32Cks); } - int ceph_posix_close(int fd) { CephFileRef* fr = getFileRef(fd); if (fr) { @@ -856,18 +854,21 @@ int ceph_posix_close(int fd) { if (fr->writingData) { if (g_calcStreamedAdler32) { - const char* adler32str = formatAdler32(fr->adler32); - logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32str); + + unsigned long adlerULong; + memcpy((&adlerULong), fr->cksCalcadler32->Final(), 4); + const char* adler32Cks = formatAdler32(adlerULong); + + logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32Cks); if (g_logStreamedAdler32) { const char *path = strdup((fr->pool + ":" + fr->name).c_str()); - fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "streamed", "adler32", adler32str); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "streamed", "adler32", adler32Cks); fflush(g_cksLogFile); } if (g_storeStreamedAdler32) { - int rc = setXrdCksAttr(fd, "adler32", adler32str); - + int rc = setXrdCksAttr(fd, "adler32", adler32Cks); if (rc != 0) { logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); } @@ -936,8 +937,8 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { fr->wrcount++; fr->bytesWritten+=count; if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten); - if (fr->writingData && g_calcStreamedAdler32) { - fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + if (g_calcStreamedAdler32) { + fr->cksCalcadler32->Update((const char*)buf, count); } return count; } else { @@ -968,8 +969,8 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) fr->bytesWritten+=count; if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten); - if (fr->writingData && g_calcStreamedAdler32) { - fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + if (g_calcStreamedAdler32) { + fr->cksCalcadler32->Update((const char*)buf, count); } return count; @@ -1047,7 +1048,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { ::gettimeofday(&fr->lastAsyncSubmission, nullptr); fr->bytesAsyncWritePending+=count; if (g_calcStreamedAdler32) { - fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count); + fr->cksCalcadler32->Update((const char*)buf, count); } return rc; } else { @@ -1467,7 +1468,9 @@ ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path, rc = ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags); if (0 == rc && !strcmp(name, "XrdCks.adler32") && g_logStreamedAdler32) { - +// +// We know that streamed checksums use ceph_posix_fsetxattr below, so this must be a readback checksum +// auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length); logwrapper((char*)"readback checksum = %s", cksAscii); fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "readback", "adler32", cksAscii); diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index 8ceb92af352..1dfee919527 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -39,7 +39,7 @@ #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucIOVec.hh" -#include +#include // simple logging for XrdCeph buffering code #define XRDCEPHLOGLEVEL 1 @@ -129,9 +129,8 @@ struct CephFileRef : CephFile { ::timeval lastAsyncSubmission; double longestAsyncWriteTime; double longestCallbackInvocation; - uLong adler32; - uLong crc32; bool writingData; + XrdCksCalcadler32 *cksCalcadler32; }; #endif // __XRD_CEPH_POSIX__ From c5ec40be716accc590c98d2b504770a983fcc5e7 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Thu, 27 Mar 2025 10:12:44 +0000 Subject: [PATCH 10/10] [XrdCeph] Add a delete call on XrdCksCalcalder32 object obtained by new The XrdCksCalc object in the CephFileRef structure needs a matching delete in ceph_posix_close() --- src/XrdCeph/XrdCephPosix.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index c867d196b9a..ff2dcec0a43 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -873,6 +873,7 @@ int ceph_posix_close(int fd) { logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); } } + delete fr->cksCalcadler32; } }