ar X iv :1 40 1. 07 02 v1 2 [c s. D S ] 19 S ep 2 01 5 A parallel space saving algorithm for frequent items and the Hurwitz zeta distribution Massimo Cafaroa,∗, Marco Pulimenoa, Piergiulio Tempestab aUniversity of Salento, Lecce, Italy bDepartamento de Fı́sica Teórica II, Facultad de Fı́sicas,Universidad Complutense, 28040 – Madrid, Spain and Instituto de Ciencias Matemáticas, C/ Nicolás Cabrera, No 13–15, 28049 Madrid, Spain Abstract We present a message-passing based parallel version of the Space Saving algorithm designed to solve thek–majority problem. The algorithm determines in parallel frequent items, i.e., those whose frequency is greater than a given threshold, and is therefore useful for iceberg queries and many other different contexts. We apply our algorithm to the detection of frequent items in both real and synthetic datasets whose probability distribution functions are a Hurwitz and a Zipf distribution respectively. Also, we compare its parallel performances and accuracy against a parallel algorithm recently proposed for merging summaries derivedby the Space Saving or Frequent algorithms. Keywords: Frequent items, Space saving algorithm, Message–passing. 1. Introduction Discovering frequent items is a data mining problem that attracted many researchers, owing to its relevance to applications in several domains. The problem is also known in the literature, depending on the specific application, as hot list analysis[20], market basket analysis [6] andiceberg query[17], [3]. Additional applications include network traffic analysis [14], [16], [31], the analysis of web logs [8], Computational and theoretical Linguistics [19], ecological field studies [30], etc. Several sequential solutions have been provided. In their survey [10], Cormode and Hadjieleftheriou classify existing algorithms as being eithercounteror sketchbased. Misra and Gries [28] proposed the first counters–based sequential algorithm, which has been rediscovered independently by Demaine et al. [14] and Karp et al. [24]. Recently designed counters–based algorithms includeLossyCounting[26] andSpace Saving [27]. In particular, Space Saving has been shown to be the most efficient and accurate algorithm among counters– based ones [10], which motivates our choice of designing a parallel version of this algorithm. Notable sketch–based solutions areCountSketch[8] andCountMin[11]. In the parallel setting, we presented in [7] a message-passing based parallel version of theFrequentalgorithm, whilst [40] presents a shared-memory parallel version. A parallel version of the Lossy Counting algorithm has been proposed in [39]. Parallel versions of the Space Saving algorithm for shared-memory architectures have been designed in [33] and[13]. A GPU (Graphics Processing Unit) accelerated algorithm for frequent items appeared in [21] and [15]. Novel shared-memory parallel algorithms for frequent items were recently proposed in [35]. A similar problem, mining frequent itemsets, is strongly related to the problem of association pattern mining, which originated from the analysis of market basket data. Such datasets are basically sets of items bought by cus- tomers, traditionally referred to as transactions. Association pattern mining entails discovering the so-called associa- tion rules between sets of items. Frequent itemsets (also known in the literature as frequent patterns) are those sets of items determined by the mining process; association rules are simple implications stated asA =⇒ B, in which both ∗Corresponding author Email addresses:massimo.cafaro@unisalento.it (Massimo Cafaro),marco.pulimeno@unisalento.it (Marco Pulimeno), p.tempesta@fis.ucm.es (Piergiulio Tempesta) Preprint submitted to Elsevier September 22, 2015 http://arxiv.org/abs/1401.0702v12 A andB are sets of items. Among the many algorithms appeared in the literature, we recall here recent work including [29], [38], [25] and [37]. In this paper, we investigate how to parallelize theSpace Savingalgorithm, and we design our algorithm in the context of message–passing architectures. To the best of our knowledge, this is the first parallel version of the Space Saving algorithm for message-passing architectures. Therefore it is the only one that can solve arbitrarily large problems on millions of cores, owing to intrinsic hardware limits related to shared-memory architectures preventing scalability of SMP (Symmetric Multi-Processing) nodes to higher processor counts. Indeed, current SMP nodes are equipped with at most a few dozens of processors. We prove thecorrectness of the algorithm, and then analyze its parallel complexity proving its cost–optimality fork = O(1). Another original aspect of this work is that we apply our algorithm to the study of frequent items in datasets whose probability distribution function is aHurwitz distribution. This distribution generalizes the classical Zipf distribution and is based on a well-known generalization of the Riemann zeta function, i.e. the Hurwitz function. We shall show that our parallel algorithm is especially suitable for treating these kind of datasets. We stress that the relevance of the Hurwitz distribution is very general. Indeed, the presenceof an extra parameter makes it a more flexible tool than the classical Zipf one. To the best of our knowledge, this work offers the first example of application of the Hurwitz distribution in dataset analysis. Before stating the problem solved by our algorithm, we need to recall a few basic definitions from multiset theory [34]. We shall use a calligraphic capital letter to denote a multiset, and the corresponding capital Greek letter to denote its underlyingset. We are given a datasetN consisting ofn elements, and an integerk, with 2 ≤ k ≤ n. Definition 1. A multisetN = (N, fN ) is a pair whereN is some set, called the underlying set of elements, and fN : N → N is a function. The generalized indicator function ofN is IN (x) := { fN (x) x ∈ N, 0 x < N, (1) where the integer–valued functionfN , for eachx ∈ N, provides itsfrequency(or multiplicity), i.e., the number of occurrences ofx inN. The cardinality ofN is expressed by |N| := Card(N) = ∑ x∈N IN (x), (2) whilst the cardinality of the underlying setN is |N| := Card(N) = ∑ x∈N 1. (3) A multiset (also called abag) essentially is a set where the duplication of elements is allowed. In the sequel,N will play the role of a finite input array, containingn elements. We can now state the problem formally. Definition 2. Given amultisetN, with |N| = n, ak–majority element (orfrequent item) is an elementx ∈ N whose frequency fN (x) is such thatfN (x) ≥ ⌊ n k ⌋ + 1. Statement of the problem. The k–majority problem takes as input an arrayN of n numbers, and requires as output the set W= { x ∈ N : fN (x) ≥ ⌊ n k ⌋ + 1 } . Therefore, thek–majority problem entails finding the set of elements whose frequency is greater than a given threshold controlled by the parameterk. It is worth noting here that whenk = 2, the problem reduces to the well known majority problem [4], [18]. This article is organized as follows. We recall the sequential Space Saving algorithm in Section 2. Our parallel space saving algorithm is presented in Section 3. We prove its correctness in Section 4, analyze it and prove its cost–optimality fork = O(1) in Section 5. We provide and discuss in Appendix experimental results concerning the application of our algorithm to both real and synthetic datasets governed by a Zipf–Mandelbrot and by a Hurwitz distribution. In particular, we also compare our algorithmwith another parallel algorithm designed and implemented by us starting from a sequential algorithm by Agarwal et al [1]. Finally, we draw our conclusions in Section 6. 2 2. The space saving algorithm We recall here a few basic facts related to the sequential Space Saving algorithm that will be used later. The algorithm uses exactlyk counters in order to solve thek-majority problem sequentially, and allows estimating the maximum error committed when computing the frequency of an item. Space Saving works as described by the pseudocode of Algorithm 1. We denote byS[i].e, S[i]. f̂ andS[i].ε̂ respectively the element monitored by theith counter ofS, the corresponding estimated frequency and error committed in the estimation. When processing an item which is already monitored by a counter, its estimated frequency is incremented by one. When processing an item which is not already monitored by one of the available counters, there are two possibilities. If a counter is available, it will be in charge of monitoring the item and its estimated frequency is set to one. Otherwise, if all of the counters are already occupied (their frequencies are different from zero), the counter storing the item with minimum frequency is incremented by one. Then the monitored item is evicted from the counter and replaced by the new item. This happens since an item which is not monitored can not have occurred in the input a number of times greater than the minimal frequency. The algorithm assumes that the item has occurredexactly a number of times equal to the frequency stored by the minimum counter, estimating by excess its frequency and introducing an error which is at most the minimum frequency. We keep track of this error, as done in [27], by storing for each monitored item its error ˆε. LetN = (N, fN ) be the input multiset,S = (Σ, f̂S) the multiset of all of the monitored items and their respective counters at the end of the sequential Space Saving algorithm’s execution, i.e., the algorithm’s summary data structure. Let |S| be the sum of the frequencies stored in the counters,fN (e) the exact frequency of an iteme, f̂S(e) its estimated frequency,f̂S min the minimum frequency inS andε̂S(e) the error of iteme, i.e. an over-estimation of the difference between the estimated and exact frequency. It is worth noting here thatf̂S min = 0 when |Σ| < k. The following relations hold (as proved in [27]) for each iteme ∈ N: |S| = |N| , (4) f̂S(e) − f̂S min ≤ f̂S(e) − ε̂S(e) ≤ fN (e) ≤ f̂S(e), e ∈ Σ, (5) fN (e) ≤ f̂S min , e < Σ, (6) f̂S min ≤ ⌊ |N| k ⌋ . (7) If an item e, at the end of the algorithm’s execution, has an estimated frequencyf̂S(e) less than the required threshold,ecan be excluded from the output, since it can not be a frequentitem. Instead, if we keep track of the error ε̂S(e) and f̂S(e)− ε̂S(e) is greater than or equal to the threshold, thene is a frequent item. All of the other output items are onlypotentialfrequent items. 3. A parallel space saving algorithm The pseudocode of Algorithm 2 describes our parallel Space Saving algorithm. We assume that the input array N is initially read by an application calling our function implementing the algorithm; for instance, every process reads the input from a file or a designated process reads it andbroadcast it to the other processes. The initial call is ParallelSpaceSaving(N , n, p, k), whereN consists ofn elements,p is the number of processors (or cores) we use in parallel andk is thek-majority parameter. Each processor is assigned a unique rank; ranks are numbered from 0 to p−1. The algorithm determines in parallelk–majority candidates. We recall here that, indeed, some of the candidates returned may be false positives as in the sequential counterpart. The algorithm works as follows. In the initial domain decomposition, each processor determines the indices of the first and last element related to its block, by applying a simple block distribution, in which each processor is responsible for either⌊n/p⌋ or ⌈n/p⌉ elements. Then, each processor determineslocal, a stream summary data structure storing its local candidates, their cor- responding estimated frequencies and errors, by using the well-known algorithm designed by Metwally et al. [27], 3 Algorithm 1 Space saving. Require: N, an array;start, first index ofN to be processed;end, last index ofN to be processed;k, thek-majority parameter Ensure: a summary containingk–majority candidate elements 1: procedureSpaceSaving(N , start, end, k) 2: S ← InitializeCounters(k) 3: for i = start to enddo 4: if N[i] is monitoredthen 5: letS[l] be the counter ofN[i] 6: S[l]. f̂ ← S[l]. f̂ + 1 7: else 8: letS[m].ebe the element with least hits 9: S[m].e← N[i] 10: S[m].ε̂← S[m]. f̂ 11: S[m]. f̂ ← S[m]. f̂ + 1 12: end if 13: end for 14: return S 15: end procedure shown in the pseudocode as theSpaceSavingfunction. An hash tablehashis then built, storing the local candidates as keys and their corresponding counters (estimated frequencies and errors) as values. This hash table is then sorted in ascending order by counters’ frequency and used as input for the parallel reduction, whose purpose is to determine global candidates for the whole array. This step is carried out by means of theParallelReductionfunction, shown as Algorithm. 3. Assuming that the parallel reduction returns the result to the processor whose rank is 0, then that processor prunes the global candidates removing all of the items below the threshold required to be frequent items and returns the results. ThePruned function, which is not shown here to save space, is just a linear scan in which every item’s frequency is compared against the threshold and, if the frequency is greater than or equal to the threshold, then the item is returned inresultas apotentialfrequent item. The parallel reduction determines global candidates for the whole array and works as shown in Algorithm 3. In each sub-step of the reduction, a processor receives as input from two processorspr andps their hash tables, that shall be called from now onS1 andS2 respectively. These data structures contain local items askeys and their counters storing estimated frequencies and errors. For a generic summaryS, we denote byS.nzthe number of items inS, and respectively withS[i].e, S[i]. f̂ andS[i].ε̂ the element monitored by theith counter ofS, the corresponding estimated frequency and the error committed in the estimation. The reduction works as follows. For both input summariesSi , i = 1, 2 we haveSi .nz≤ k. We determinem1 as the minimum among the frequencies ofS1 if S1.nz= k, otherwisem1 = 0. Similarly, we determinem2 for S2. Then, we combine the two summaries by calling theCOMBINE function, shown as pseudocode in Algorithm 4. We scan the first hash table, and for each item inS1 we check if the item also appears inS2 by calling theFIND function. In this case, we insert the entry for the item inSC, storing as its estimated frequency (respectively as its error) the sum of its frequency and the frequency of the corresponding itemin S2 (respectively the sum of its error and the error of the corresponding item inS2), and remove the item fromS2. Otherwise, we insert the entry for the item storing as its estimated frequency (respectively as its error) the sum of its frequency and the minimumm2 (respectively the sum of its error and the minimumm2). We then scan the second hash table. Since each time an item inS1 was also present inS2 we removed that item from S2, nowS2 contains only items that do not appear inS1. For each item inS2 we simply insert the item inSC and in the corresponding counter we store as estimated frequency (respectively as its error) the sum of its frequency and the minimumm1 (respectively the sum of its error and the minimumm1). Finally, the entries inSC are sorted by the counters’ frequency and this hash table is returned. 4 Note that for theSC summary returned by theCOMBINE function it holds thatSC.nz ≤ 2k. Indeed,SC may contain up to 2k items in the worst case (i.e., when all of the items in bothS1 andS2 are different). However, we need to return at mostk items. Therefore, ifSC.nz≤ k (the number of entries with nonzero counter’s frequency is at mostk), we returnSC asSM. Otherwise, we remove the firstSC.nz− k items and then returnSC as SM, which contains exactly thek items with the largest frequencies. As an implementation detail, in theCOMBINEfunction it is possible to avoid using theSC hash table altogether. Indeed, when scanningS1 one can simply update the frequency of the current item beingprocessed, and when scan- ningS2 each item will be inserted intoS1. At the end,S1 is returned. However, we prefer to useSC in the interest of clarity, noting that the overall space complexity ofCOMBINEis O(k) in either case. Algorithm 2 Parallel space saving. Require: N, an array;n, the length ofN; p, the number of processors;k, thek-majority parameter Ensure: an hash table containingk–majority candidate elements 1: procedureParallelSpaceSaving(N , n, p, k) ⊲ Then elements of the input arrayN are distributed to the p processors so that each one is responsible for either⌊n/p⌋ or ⌈n/p⌉ elements; letle f t andright be respectively the indices of the first and last element of the sub-array handled by the process with rankid; ranks are numbered from 0 to p− 1 2: le f t← ⌊(id − 1) n/p⌋ 3: right← ⌊id n/p⌋ − 1 4: local← SpaceSaving(N , le f t, right, k) ⊲ determine local candidates 5: let hashbe an hash table storing< item, counter> pairs inlocal 6: sorthashby counters’ frequency in ascending order 7: global← ParallelReduction(hash, k) ⊲ determine the global candidates for the whole array 8: if id == 0 then ⊲ we assume here that the processor with rank 0 holds the final result of the parallel reduction 9: result← Pruned(global, n, k) 10: return result 11: end if 12: end procedure 4. Correctness In this Section we formally prove that our Parallel Space Saving Algorithm is correct when executed onp pro- cessors. We decompose the original array (i.e. multiset) ofdataN in p subarraysNi (i = 0, . . . , p − 1), namely N = ⊎ i Ni . Here the⊎ operator denotes thejoin operation[34], which is the sum of the frequency functions as follows: IA⊎B(x) = IA(x) + IB(x). Let the sub–arrayNi be assigned to the processorpi , whose rank is denoted byid, with id = 0, . . . , p− 1. Let also|Ni | denote the cardinality ofNi , with ∑ i |Ni | = |N| = n. The first step of the algorithm consists in the execution of the sequential Space Saving algorithm (which has already been proved to be correct by its authors), on the subarray assigned to each processorpi . Therefore, in order to prove the overall correctness of the algorithm, we just needto demonstrate that the parallel reduction is correct. Our strategy is to prove that if a single sub-step of the parallelreduction is correct, then we can naturally extend the proof to theO(log p) steps of the whole parallel reduction. We begin by proving a couple of preliminary results necessary for the proof of correctness of our parallel algo- rithm; both results are related to the combined summarySC obtained by Algorithm 4. We present in Table 1 the notation used throughout this Section, and recall here thatwe use a calligraphic capital letter to denote a multiset, and the corresponding capital Greek letter to denote itsunderlyingset. Mathematically, we can express the combine operation as shown by the following two equations: 5 Algorithm 3 Parallel reduction for space saving summaries. Require: S1, S2: hash tables ordered by counters’ frequency;k: thek-majority parameter; the hash tables store pairs < item, counter>, a monitored iteme is used as key and a counterc as object, including the estimated frequency c. f̂ and the errorc.ε̂ of the iteme Ensure: an hash table, which is themerged summarySM 1: procedureParallelReduction(S1,S2, k) ⊲ Si .nz is the number of items in the hash tableSi 2: if S1.nz== k then 3: let counterbe the first counter inS1 4: m1← counter. f̂ 5: else 6: m1← 0 7: end if 8: if S2.nz== k then 9: let counterbe the first counter inS2 10: m2← counter. f̂ 11: else 12: m2← 0 13: end if 14: SC ← combine(S1,S2,m1,m2, k) 15: if SC.nz≤ k then 16: return SC asSM; 17: else 18: excess← SC.nz− k 19: remove firstexcessitems fromSC 20: return SC asSM; ⊲ return the lastk items 21: end if 22: end procedure Table 1: Notation Notation Description L A generic multiset (input or summary) Λ Underlying set ofL |L| Cardinality ofL |Λ| Cardinality of the underlying set ofL f̂L(e) Let L be a summary related to an input multisetN; given an iteme ∈ L, f̂L(e) is the estimated frequency of iteme inN fL(e) Exact frequency of iteme in L, an input multiset ε̂L(e) Estimated error of iteme in L, a summary related to an input multiset f̂ min L Minimum of counters’ frequency inL; we let f̂ min L = 0 if |Λ| < k 6 Algorithm 4 Combine. Require: S1, S2: hash tables ordered by counters’ frequency;m1, the minimum of counters’ frequency inS1; m2, the minimum of counters’ frequency inS2; k, thek-majority parameter Ensure: an hash table, which is thecombined summarySC 1: procedurecombine(S1, S2, m1, m2, k) 2: letSC be an empty hash table 3: for eachentry in S1 do 4: item← entry.key 5: counter← entry.val 6: f ound← S2.Find(item) 7: if f oundthen 8: newcounter. f̂ ← counter. f̂ + f ound. f̂ 9: newcounter.ε̂← counter.ε̂ + f ound.ε̂ 10: SC.Put(item, newcounter) 11: S2.Remove(item) 12: else 13: newcounter. f̂ ← counter. f̂ +min2 14: newcounter.ε̂← counter.ε̂ +min2 15: SC.Put(item, newcounter) 16: end if 17: end for 18: for eachentry in S2 do 19: item← entry.key 20: counter← entry.val 21: newcounter. f̂ ← counter. f̂ +min1 22: newcounter.ε̂← counter.ε̂ +min1 23: SC.Put(item, newcounter) 24: end for 25: sortSC by counters’ frequency in ascending order 26: return SC 27: end procedure 7 f̂SC (e) =                  f̂S1(e) + f̂S2(e), e ∈ Σ1 ∩ Σ2, f̂S1(e) + f̂ min S2 , e∈ Σ1 \ Σ2, f̂S2(e) + f̂ min S1 , e∈ Σ2 \ Σ1, (8) ε̂SC(e) =                  ε̂S1(e) + ε̂S2(e), e ∈ Σ1 ∩ Σ2, ε̂S1(e) + f̂ min S2 , e ∈ Σ1 \ Σ2, ε̂S2(e) + f̂ min S1 , e ∈ Σ2 \ Σ1. (9) As shown in eq. (8), if an item belongs to both summaries, we update its estimated frequency by summing up the estimated frequencies of the counters corresponding to theitem in the two summaries. If an item belongs to only one summary, we update its estimated frequency by adding the minimum frequency stored in the other summary. At the same time we can estimate the error for each item, as shown by eq. (9). This combining step leads to a summarySC storing at most 2k distinct items. This summary includes all of the frequent items belonging to the set of items which is the union of the underlying sets related to the two input summaries. Lemma 1. Let S1 = (Σ1, f̂S1) andS2 = (Σ2, f̂S2) be two summaries related respectively to the input sub-arrays N1 = (N1, fN1) andN2 = (N2, fN2), withN = N1 ⊎ N2 = (N, fN ). LetSC = (ΣC, f̂SC) be the intermediate summary obtained combiningS1 andS2, and letδ = f̂ min S1 + f̂ min S2 . The following relation holds: |SC| = |S1| + |S2| + xδ, (10) wherex = |ΣC| − k. Proof. Let c = |Σ1 ∩ Σ2|, d1 = |Σ1 \ Σ2| andd2 = |Σ2 \ Σ1|. Then,x = c+ d1 + d2 − k. It follows that xδ = (c+ d1 + d2 − k)δ = (c+ d1) f̂ min S1 + (c+ d2) f̂ min S2 − k f̂ min S1 − k f̂ min S2 + d1 f̂ min S2 + d2 f̂ min S1 . (11) Since|Σ1| = c + d1 ≤ k and |Σ2| = c + d2 ≤ k, and observing that|Σ1| < k ⇔ f̂ min S1 = 0 and|Σ2| < k ⇔ f̂ min S2 = 0, it follows that eq. (11) reduces to xδ = d1 f̂ min S2 + d2 f̂ min S1 . (12) Therefore, we can rewrite eq. (10) as |SC| = |S1| + |S2| + d1 f̂ min S2 + d2 f̂ min S1 . (13) This equation expresses the fact that the sum of the frequencies stored inSC can be computed according to the way we combine the summaries in eq. (8). Precisely, ifd1 = 0 andd2 = 0, thenS1 andS2 share all of the elements, so that |SC| = |S1| + |S2| . (14) Otherwise, for items belonging to just one of the summaries,we add to their frequencies the minimum frequency of the other summary. In other words, besides their frequency (which is taken into account by|S1| + |S2|) we also add exactlyd1 f̂ min S2 + d2 f̂ min S1 . 8 Lemma 2. Let the summariesS1, S2 andSC, the input multisetsN1 andN2 and the quantityδ be defined as in Lemma 1. Assume that the following inequalities hold for each item e∈ N1: f̂S1(e) − f̂S min 1 ≤ f̂S1(e) − ε̂S1(e) ≤ fN1(e) ≤ f̂S1(e), e ∈ Σ1, (15) fN1(e) ≤ f̂S min 1 , e < Σ1. (16) Similarly, assume that the following inequalities hold foreach item e∈ N2: f̂S2(e) − f̂S min 2 ≤ f̂S2(e) − ε̂S2(e) ≤ fN2(e) ≤ f̂S2(e), e ∈ Σ2, (17) fN2(e) ≤ f̂S min 2 , e < Σ2. (18) Then, for each item e∈ N we have: f̂SC (e) − δ ≤ f̂SC (e) − ε̂SC(e) ≤ fN (e) ≤ f̂SC (e), e ∈ ΣC, (19) fN (e) ≤ δ, e < ΣC. (20) Proof. The summarySC is derived fromS1 andS2 by applying eqs. (8) and (9), so that, in order to prove eq. (19) we need to distinguish three cases: 1. Lete ∈ Σ1∩Σ2: f̂S1(e)+ f̂S2(e)−( f̂ min S1 + f̂ min S2 ) ≤ f̂S1(e)+ f̂S2(e)−(ε̂S1(e)+ε̂S2(e)) ≤ fN1(e)+ fN2(e) ≤ f̂S1(e)+ f̂S2(e). But, by definition in this case it iŝfSC (e) = f̂S1(e) + f̂S2(e), ε̂SC(e) = ε̂S1(e) + ε̂S2(e), fN (e) = fN1(e) + fN2(e) andδ = f̂ min S1 + f̂ min S2 , so that, taking into account eqs. (15) and (17), eq. (19) holds; 2. Let e ∈ Σ1 \ Σ2: following the same reasoning as before, taking into account eqs. (15) and (18) and that in this case it is by definition̂fSC(e) = f̂S1(e) + f̂ min S2 and ε̂SC(e) = ε̂S1(e) + f̂ min S2 , we obtain f̂S1(e) − f̂ min S1 ≤ f̂S1(e) − ε̂S1(e) ≤ fN1(e) + fN2(e) ≤ f̂S1(e) + f̂ min S2 , so that f̂S1(e) − f̂ min S1 ≤ f̂S1(e) − ε̂S1(e) ≤ fN (e) ≤ f̂SC (e). Rewriting f̂S1(e) − f̂ min S1 as f̂S1(e) + f̂ min S2 − f̂ min S2 − f̂ min S1 , and f̂S1(e) − ε̂S1(e) as f̂S1(e) + f̂ min S2 − f̂ min S2 − ε̂S1(e) we obtain eq. (19); 3. Lete ∈ Σ2 \ Σ1: immediate, taking into account eqs. (16) and (17), since this case is symmetric to the previous one. To prove eq. (20), taking into account eqs. (16) and (18) we obtain for an iteme < ΣC: fN1(e)+ fN2(e) ≤ f̂ min S1 + f̂ min S2 , i.e., fN (e) ≤ δ. Now we can formally prove the correctness of our parallel algorithm. Let us consider how it works. Before engaging in the parallel reduction, each processor appliesthe sequential Space Saving algorithm to its local input, producing an hash table data structure containing at mostk counters with estimated frequency greater than zero. In the parallel reduction, we merge pairs of data structures until we output the final result. We start by proving the following Theorem 3. A single reduction sub-step correctly merges its two input summaries. Proof. Let S1 = (Σ1, f̂S1) andS2 = (Σ2, f̂S2) be two summaries related respectively to the input sub-arraysN1 = (N1, fN1) andN2 = (N2, fN2), withN = N1⊎N2 = (N, fN ). LetSC = (ΣC, f̂SC ) be the intermediate summary obtained combiningS1 andS2 by using theCOMBINEfunction, and letSM = (ΣM , f̂SM ) be the final merged summary. We are going to prove that if eqs. (5) - (7) hold forS1 andS2 and, if it is verified a relaxed version of eq. (4), i.e., for a summaryS it holds that |S| ≤ |N| , (21) 9 then these properties continue to be true also forSM (it is worth noting here that eq. (21) also holds for summaries produced by the sequential Space Saving algorithm). We shall show that this is enough to guarantee the correctness of the merge operation. Themergeoperation is done in two steps and provides as output a summary of at mostk items. In the first step we combine the input summaries as shown in eqs. (8) and (9). This combining step leads to an intermediate summary SC storing at most 2k distinct items. In the second and final step, we analyzeSC in order to return the final output. IfSC holds at mostk entries (i.e., |ΣC| ≤ k), we returnSC as the output, i.e.,SM = SC. However, ifSC holds more thank entries (the data structure may hold in the worst case up to 2k entries), we need to select and returnk entries. In this case, we simply return asSM the lastk entries inSC, i.e., those corresponding to the items with greatest frequency (the entries are sorted by counters’ frequency). We start by noting that in the summarySC generated by the first step a counter’s frequency still represents an excess estimation of the monitored item, as in Space Saving.As before, letδ = f̂ min S1 + f̂ min S2 , andx = |ΣC| − k. By Lemma 1, eq. (10), if|ΣC| ≤ k, thenxδ = 0 (indeed, when|ΣC| < k thenδ = 0, when|ΣC| = k thenx = 0) and the merged summarySM coincides withSC. In that case, since by eq. (21)|S1| ≤ |N1| and|S2| ≤ |N2|, we have that |SM | = |S1| + |S2| ≤ |N|, so that eq. (21) also holds forSM. Otherwise, if|ΣC| > k, in order to obtain the final merged summary, we return inSM thek items inSC with the highest frequencies. Precisely, let the entries inSC be sorted in ascending order with regard to the counters’ frequencies. Then, |SM | = |S1| + |S2| + xδ − x ∑ i=1 f̂SC (ei), (22) where the sum is extended over the firstx entries. We observe that ∑x i=1 f̂SC (ei) ≥ xδ, owing to the fact that the counters appear in sorted order, and the estimated frequencies stored in each of the initialx counters we are discarding are greater than or equal toδ. In this case too, it follows that eq. (21) holds forSM. Indeed, |SM | ≤ |S1| + |S2| ≤ |N1| + |N2| = |N| . (23) We have to prove that the other properties are verified as well. In particular we have to show that the error bound guaranteed by the sequential Space Saving algorithm is preserved by the merge operation. In order to do this, we observe that̂f min SM is such that (see the similar proof of Lemma 3.3 in [27]) f̂ min SM = |SM | − ∑ e∈ΣM ( f̂SM (e) − f̂ min SM ) k . (24) At the same time, ∑ e∈ΣM ( f̂SM (e) − f̂ min SM ) ≥ 0, because the frequency of each item is greater than or equalto the minimum. Therefore we have: f̂ min SM ≤ |SM | k . (25) Observing thatf̂ min SM ≥ δ and taking into account eq. (25) and the fact that eq. (21) also holds forSM, we can bound f̂ min SM as follows: δ ≤ f̂ min SM ≤ |SM | k ≤ ⌊ |N| k ⌋ . (26) At last, taking into account Lemma 2, eqs. (19) and (20) and the way we constructSM , we have that, for each iteme ∈ N (i.e., for each distinct item in the inputN = N1 ⊎N2): f̂SM (e) − f̂ min SM ≤ f̂SM (e) − ε̂SM (e) ≤ fN (e) ≤ f̂SM (e), e ∈ ΣM , (27) and fN (e) ≤ f̂ min SM ≤ ⌊ |N| k ⌋ , e < ΣM , (28) 10 showing that eqs. (5) - (7) also hold forSM. It is worth noting here that a single reduction step (i.e., a parallel execution withp = 2 processors) is fully equivalent to a sequential algorithm for merging two data summaries. Therefore, Theorem 3 states the correctness of this algorithm. We can now prove the following Proposition 4. The whole parallel reduction correctly merges its input summaries. Proof. The correctness of the whole parallel reduction follows straightforwardly. Indeed, it is enough noting that in the initial step of the parallel reduction we process summaries derived by applying locally in each processor Space Saving, and eqs. (5) - (7) and (21) hold for these summaries. By Theorem 3, the merge operation used in each sub-step of the parallel reduction outputs a summary for which eqs. (5) - (7) and (21) continue to hold and whose error is still within the bound on the error committed estimating the frequencies guaranteed by Space Saving. Therefore, at the end of theO(log p) steps required for the whole reduction, the final output summary correctly provides the frequent items for the whole input. The main result of this Section is the following Theorem 5. Algorithm 2 correctly determines frequent items in parallel. Proof. The result follows immediately from Theorem 3 and Proposition 4. 5. Parallel complexity In this Section, we discuss the parallel complexity of the proposed parallel Space Saving algorithm. We assume, in the following analysis, thatk = O(1). The assumption is justified by the fact that it is verifiedin all of the cases of practical interest for this application. At the beginning of the algorithm, the workload is balanced using a block distribution; this is done with two simpleO(1) assignments; therefore, the complexity of the initial domain decomposition isO(1). Next, we determine local candidates in each subarray using the sequential Space Saving algorithm. Owing to the block distribution and to the fact that Space Saving is linear in the number of input elements, the complexity of this step isO(n/p). Then, we engage in a parallel reduction to determine the global candidates for the whole input array. The whole reduction requires in the worst caseO(log p). Indeed, in the initial step we combine the input summaries inO(k), by using hash tables. For each item in theS1 hash table, we try to find inO(1) time a corresponding item in theS2 hash table. Then, we insert in theSC hash table the entry for the item, again inO(1) time and, if we have found the item, we delete the corresponding entry fromS2 in O(1) time. Since there are at mostk entries, this requiresO(k). We then scan the entries inS2 (there can be at most k entries, this happens when the items in the two hash tables are all distinct, otherwise there will be less thank entries because we remove corresponding items fromS2 each time we find a match). For each entry inS2, we simply insert the corresponding item inSC in O(1) time. Therefore, processingS2 requires in the worst caseO(k) time. In the second step, we simply return the combined summaries if the total number of entries inSC is less than or equal tok, otherwise, we return the lastk entries in sorted order ofSC. The time required isO(k). To recap, since we doO(k) work in each step of the parallel reduction,k = O(1) by assumption and there are O(log p) such steps, the overall complexity of the reduction isO(log p). The communication cost, i.e., the amount of data exchanged in the parallel reduction is            log p ∑ i=1 p 2i k            = (p− 1)k = O(pk) = O(p), (29) sincek = O(1) by assumption. 11 Finally, the worst case complexity of thePrunedfunction isO(k) = O(1), since this is just a linear scan in which we compare the frequency of each item against the threshold required to be a frequent item, and put an item in the resultsummary if its frequency is greater than or equal to the required threshold. It follows that the overall complexity of the parallel SpaceSaving algorithm isO(n/p+ log p). We are now in the position to state the following Theorem: Theorem 6. The algorithm is cost–optimal for k= O(1). Proof. Cost–optimality requires by definition that asymptotically pTp = T1 whereT1 represents the time spent on one processor (sequential time) andTp the time spent onp processors. The sequential algorithm requiresO(n) in the worst case, and the parallel complexity of our algorithm isO(n/p+ log p) whenk = O(1). It follows from the definition that the algorithm is cost–optimal forn = Ω(p log p). Cost–optimality is an important theoretical property of parallel algorithms, since it implies linear speedup (equal to p) and efficiency equal to 1. Moreover, cost–optimality also implies good scalability of the algorithm when using smaller sized parallel machines equipped with a limited number of processors. Indeed, scaling down a cost–optimal algorithm on a reduced number of processors will result in a fast algorithm, while scaling down a non cost–optimal algorithm may even result in a parallel algorithm doing morework and being slower than the corresponding best sequential algorithm. We proceed with the analysis of isoefficiency and scalability. The sequential algorithm has complexity O(n); the parallel overhead isTo = pTp − T1. In our case,To = p(n/p+ log p) − n = p log p. The isoefficiency relation [22] is thenn ≥ p log p. Finally, we derive the scalability function of this parallel system [32]. This function shows how memory usage per processor must growto maintain efficiency at a desired level. If the isoefficiency relation isn ≥ f (p) and M(n) denotes the amount of memory required for a problem of sizen, then M( f (p))/p shows how memory usage per processor must increase to maintain the same level of efficiency. Indeed, in order to maintain efficiency when increasingp, we must increasen as well, but on parallel computers the maximum problem size is limited by the available memory, which is linear inp. Therefore, when the scalability function M( f (p))/p is a constantC, the parallel algorithm is perfectly scalable;Cp represents instead the limit for scalable algorithms. Beyond this point an algorithm is not scalable (from this point of view). In our case the function describing how much memory is used for a problem of sizen is given byM(n) = n. Therefore,M( f (p))/p = O(log p) with f (p) given by the isoefficiency relation. 6. Conclusions To the best of our knowledge, we have designed and implemented the first message-passing based parallel version of the Space Saving algorithm to solve thek–majority problem. In particular, we have shown that our algorithm retains all of the key features of the sequential Space Saving algorithm. Besides proving its formal correctness, we have applied our algorithm to the detection of frequent items in real datasets and in synthetic datasets whose probability distribution functions are a Hurwitz and a Zipfdistribution respectively. Extensive experimental results on both synthetic and real datasets have been reported and discussed in Appendix, clearly showing that our algorithm outperforms the parallel version of the algorithm designedby Agarwal et al. with regard to precision, total error and average relative error, while providing overall comparable parallel performances with linear speedup. Acknowledgment We are indebted to the unknown referees for enlightening observations, which helped us to improve the paper. The authors would also like to thank G. Cormode and M. Hadjieleftheriou for making freely available their sequential implementation of the Space Saving algorithm. We are also grateful to Prof. Palpanas of Paris Descartes University for providing us with the real datasets used in the experiments. The research of M. Cafaro has been supported by CMCC, Italy, under the grant FISR Gemina project, Italian Ministry of Education, University and Research. The research of P. Tempesta has been supported by the grant FIS2011–22566, Ministerio de Ciencia e Innovación, Spain. 12 A. Experimental results We report here the experimental results we have obtained running the parallel Space Saving algorithm on an IBM iDataPlex cluster. Each SMP node is configured with two 2.6 Ghz octa-core Xeon Sandy Bridge E5-2670 CPUs with 20 MB level 3 cache and 64 GB of main memory. The interconnection network is Infiniband 4x FDR-10 (Fourteen Data Rate) 40 Gbp/s, which provides 5 GB/s unidirectional bandwidth. Our parallel implementation,developed in C++ using MPI, is based on the sequential source code for theSpace Savingalgorithm developed in [9]. In order to assess the merits of our parallel algorithm, we also compare it with a second parallel algorithm which we have designed and implemented starting from a sequentialalgorithm by Agarwal et al [1]. The authors designed their algorithm for merging Frequent summaries, and then proved that for Space Saving summaries subtracting the minima from their respective summaries (if a summary possessesk counters) makes them isomorphic to Frequent summaries, so that their algorithm can be reused (see Lemma 2in [1]). In theParallelAgarwalalgorithm (see Algorithm 5) each processor starts by executing the Space Saving algorithm on its local sub-array. Then, just before engaging in the parallel reduction, if thelocal summary holdsk nonzero counters, the minimum frequency, which is stored in the firstcounterlocal[1], is subtracted from each counter. It follows that thelocal summary stores at mostk − 1 counters, so that the algorithm by Agarwal et al. shown as the AgarwalParallelReduction(see Algorithm 6), can be applied. The input of the parallel reduction is an hash table, storing the entries inlocal sorted by counters’ frequency. Algorithm 5 Parallel algorithm by Agarwal et al. Require: N, an array;n, the length ofN; p, the number of processors;k, thek-majority parameter Ensure: an hash table containingk–majority candidate elements 1: procedureParallelAgarwal(N , n, p, k) ⊲ Then elements of the input arrayN are distributed to thep processors so that each one is responsible for either⌊n/p⌋ or ⌈n/p⌉ elements; letle f t andright be respectively the indices of the first and last element of the sub-array handled by the process with rankid; ranks are numbered from 0 top−1 2: le f t← ⌊(id − 1) n/p⌋ 3: right← ⌊id n/p⌋ − 1 4: local← SpaceSaving(N , le f t, right) ⊲ determine local candidates 5: if local.nz== k then ⊲ local.nzis the number of items in the stream summarylocal with nonzero frequency 6: m1← local[1]. f̂ 7: for i = 1 tok do 8: local[i]. f̂ ← local[i]. f̂ −m1 9: end for 10: end if 11: let hashbe an hash table storing< item, counter> pairs inlocal 12: sorthashby counters’ frequency in ascending order 13: global← AgarwalParallelReduction(hash, k) ⊲ determine the global candidates for the whole array 14: if id == 0 then ⊲ we assume here that the processor with rank 0 contains the final result of the parallel reduction 15: return global 16: end if 17: end procedure Although the algorithm is presented in the context of merging two summaries, it can actually be used in parallel as a reduction operator, owing to the fact that the authors also proved a bound on the output error, which is within the error affecting the input summaries. The parallel reduction works as follows. It starts combining the two data sets, by calling theAGARWAL-COMBINE function. LetS be the combined summary. Scanning the first hash table, for each item inS1 the function checks if the item also appears inS2. In this case, it inserts the entry for the item inS, storing as its estimated frequency the sum of the item’s frequency and the frequency of the corresponding item inS2, and removes the item fromS2. Otherwise, the function inserts the entry for the item storing as its estimated frequency its frequency inS1. 13 Algorithm 6 Parallel Reduction by Agarwal et al. Require: S1, S2: hash tables;k, k-majority parameter (the number of counters is at mostk− 1); Ensure: an hash table containingk–majority candidate elements 1: procedureAgarwalParallelReduction(S1,S2, k) ⊲ a merged summary ofS1 andS2 2: S ← agarwal-combine(S1,S2); 3: sortS by counters’ frequency in ascending order 4: if S.nz≤ k− 1 then 5: return S; 6: else ⊲ prune counters inS 7: excess← S.nz− k+ 1 ⊲ determine frequency to be subtracted 8: entry= S[excess] 9: counter← entry.val 10: f req← counter. f̂ ⊲ subtract this frequency from the lastk− 1 counters 11: for i = excess+ 1 toS.nzdo 12: entry= S[i] 13: item← entry.key 14: counter← entry.val 15: f requency← counter. f̂ 16: S.U pdate(item, f requency− f req) 17: end for 18: remove firstexcessitems fromS 19: return S; 20: end if 21: end procedure The function then scans the second hash table. Since each time an item inS1 was also present inS2 it was removed fromS2, nowS2 contains only items that do not appear inS1. For each item inS2 it simply inserts the item inS and in the corresponding counter it stores as estimated frequency its frequency inS2. This could entail the use of up to 2k − 2 counters in the worst case, whenS1 andS2 share no item. LetS.nz be the number of counters inS. The entries inS are sorted by the counters’ frequency in ascending order, and, if S.nz≤ k− 1 the algorithm returnsS. Otherwise, a pruning operation is required. The combine step can be performed with a constant number of sorts and scans of summaries of sizeO(k). Then, the algorithm subtracts from the lastk− 1 counters the frequency of the (S.nz−k+1)–th counter, removes the firstS.nz−k+1 counters and returns the remaining k− 1 counters, whose frequency has been corrected. The algorithm requires in the worst case time linear in the total number of counters, i.e.,O(k) if implemented as described in [1] using an hash table. In the experiments, we tested our algorithm against the one from Agarwal et al. on both synthetic and real datasets. Regarding synthetic datasets, the input distributions used in our experiments are the Riemann–Hurwitz distribution (Hurwitz for short), and its particular case, the Zipf distribution, which is one of the most used in experiments related to sequential algorithms for frequent items. We recall thatthe Zipf distribution has associated the probability density function (p.d.f.) PZ(x) = x−(ρ+1) ζ(ρ + 1) x ≥ 1, (A.1) whereρ is a positive real parameter controlling the skewness of thedistribution and ζ(s) = ∞ ∑ k=1 1 ks , Re s> 1 (A.2) is the Riemann zeta function [23]. The Hurwitz distributionhas p.d.f. 14 Table A.1: Statistical characteristics of the real datasets Kosarak Retail Q148 Nasa Count 8019015 908576 234954 284170 Distinct items 41270 16470 11824 2116 Min 1 0 0 0 Max 41270 16469 149464496 28474 Mean 2387.2 3264.7 3392.9 353.9 Median 640 1564 63 120 Std. deviation 4308.5 4093.2 309782.5 778.1 Skewness 3.5 1.5 478.1 6.5 PH(x, a) = x−(ρ+1) ζH(ρ + 1, a) x ≥ 1, (A.3) where ζH(s, q) = ∞ ∑ k=1 1 (k+ q)s , Re s> 1, Re q> 0. (A.4) is the Riemann–Hurwitz zeta function. Both functions play acrucial role in analytic number theory [23] [36]. The real datasets we used come from different domains [12]. All of the datasets are publicly available, and two of them (Kosarak and Retail) have been widely used and reportedin the data mining literature. Overall, the four datasets are characterized by a diversity of statistical characteristics, which we report in Table A.1. Kosarak: This is a click-stream dataset of a Hungarian online news portal. It has been anonymized, and consists of transactions, each of which is comprised of several integer items. In the experiments, we have considered every single item in serial order. Retail: This dataset contains retail market basket data coming froman anonymous Belgian store. Again, we consider all of the items belonging to the dataset in serial order. Q148: Derived from the KDD Cup 2000 data, compliments of Blue Martini, this dataset contains several data. The ones we use for our experiments are the values of the attribute Request Processing Time Sum (attribute number 148), coming from the “clicks” dataset. A pre-processing step was required, in order to obtain the final dataset. We had to replace all of the missing values (appearing as question marks) with the value of 0. Nasa: Compliments of NASA and the Voyager 2 Triaxial Fluxgate Magnetometer principal investigator, Dr. Norman F. Ness, this dataset contains several data. We selected the Field Magnitude (F1) and Field Modulus (F2) attributes from the Voyager 2 spacecraft Hourly Average Interplanetary Magnetic Field Data. A pre-processing step was required for this dataset: having selected the data for the years 1977-2004, we removed the unknown values (marked as 999), and multiplied all values by 1000 to convertthem to integers (since the original values were real numbers with precision of 3 decimal points). The values of the two attributes were finally concatenated. In our experiments, we read all of the values of the attribute F1, followed by all of the values of the attribute F2. Denoting with f the true frequency of an item and witĥf the corresponding frequency reported by an algorithm, then the absolute error is, by definition, the difference ∣ ∣ ∣ f − f̂ ∣ ∣ ∣. The (absolute) total error is then defined as the sum of the absolute errors related to the items reported by an algorithm. Similarly, the absolute relative error is defined as ∆ f = | f− f̂ | f , and the average relative error is derived by averaging the absolute relative errors over all of the measured frequencies. Precision is defined as the total number of truek-majority elements reported over the total number of items re- ported. Therefore, this metric quantifies the number of false positives reported by an algorithm in the output data summary. Recall is instead the total number of truek-majority elements reported over the number of truek-majority 15 elements given by an exact algorithm. It follows that an algorithm is correct if an only if its recall is equal to 1 (or 100%); both algorithms under test have already been proved to be formally correct and their recall in all of the tests is indeed equal to 1. A.1. Real datasets: error In this Section, we report the experimental results obtained on the real datasets. We do not report on the perfor- mances, owing to the fact that processing the largest dataset on a single processor requires just a few milliseconds. Since the datasets are real, the only parameter we can vary isk. In the following tests,k has been varied from 100 to 1000, in steps of 100 (owing to the statistical characteristics of the real datasets). We report the total error, the precision and the average relative error (denoted from now on as ARE). Figure A.1a presents the results related to the total error for the Kosarak and Nasa datasets, whilst Figure A.1b is related to the Q148 and Retail datasets. Note that we use a logarithmic scale for the total error values, since some of the curves would otherwise be too close to distinguish them. As shown, our algorithm outperforms the Agarwal et al. algorithm for all of the datasets under test, with verylow and close to zero total error for both the Kosarak and Q148 datasets. The values for both the Nasa and the Retail datasets are about an order of magnitude smaller than the corresponding values obtained by Agarwal et al. Regarding the precision, the results in Figures A.2a (Kosarak and Nasa datasets) and A.2b (Q148 and Retail datasets) are also clear evidence of the superiority of our algorithm. As shown, the algorithm by Agarwal et al. achieves a precision almost equal to zero for all of the datasets under test. Our algorithm exhibits a precision close to one for both Kosarak and Q148. For the Nasa dataset, the precision is between 0.55 and 0.85 fork in the range [100 - 300], and steadily increases towards 1.0 fork in the range [400 - 1000]. Similarly, for the Retail dataset,the precision is between 0.5 and 1.0 fork in the range [100 - 400], and steadily increases towards 0.7 for k in the range [500 - 1000]. Finally, Figures A.3a (Kosarak and Nasa datasets) and A.3b (Q148 and Retail datasets) are related to the average relative error, with our algorithm clearly outperforming the other. Our algorithm exhibits ARE values close to zero for both Kosarak and Q148. For the Nasa dataset, our algorithm’s ARE values are steadily decreasing from 0.2 to 0. The same behavior is observed for the Retail dataset, where our algorithm exhibits ARE close to zero fork = 100, close to one fork = 200, equal to 0.5 fork = 300 and then steadily decreasing ARE values from 0.62 to 0.26in the range [300 - 1000]. (a) Kosarak and Nasa datasets (b) Q148 and Retail datasets Figure A.1: Real datasets: Total Error varyingk on p = 8 cores A.2. Synthetic datasets: error We have carried out several experiments, with the aim of analyzing the error committed by the algorithms under test. We have fixeda = 0.5 in all of the simulations involving the Hurwitz distribution. Indeed, for integer values of 16 (a) Kosarak and Nasa datasets (b) Q148 and Retail datasets Figure A.2: Real datasets: Precision varyingk on p = 8 cores (a) Kosarak and Nasa datasets (b) Q148 and Retail datasets Figure A.3: Real datasets: Average Relative Error varyingk on p = 8 cores the parametera, the Hurwitz distribution becomes the Zipf one (with a shifted value of theρ parameter). As usual, we report the total error, precision and ARE. The following experiments related to the error are characterized by the input sizen, the parameterk and the skew ρ of the distribution; for each experiment we have determinedthe corresponding total error, average relative error and precision. In particular, in the first experiment we fixedn = 500, 000, 000 andρ = 1.5 lettingk vary from 1000 to 10,000 in steps of 1000. In the second experiment,ρ = 1.5,k = 2000 andn varies from 100,000,000 to 1,000,000,000 in steps of 100,000,000. Finally, in the third experiment wefixedn = 500, 000, 000,k = 2000 andρ varies from 0.5 to 3.0 in steps of 0.5. Table A.2 recaps the experiments carriedout. For each different value ofk, n andρ the algorithms have been run 20 times using a different seed for the pseudo-random generator associated to the distribution (using the same seeds in the corresponding executions of different algorithms). For each input distribution generated,the algorithm has been run on up to 8 cores (one core per node), andthe results have been averaged for each number of cores, over all of the runs. The input elements are 32 bits unsigned integers. We also computed for each mean the corresponding mean’s 95% confidence interval (by using the Studentt distribution). Even though we have determined the total error, ARE and precision for each different value ofp = 1, . . . , 8, we only report here the results forp = 8 to save space, taking into account that the observed behavior did not change forp = 2, . . . , 7 (and, of course, the behavior forp = 1 was identical for both algorithms since no parallel 17 Table A.2: Design of error experiments for Zipfian and Hurwitz distributions Experiment n (millions) k (thousands) ρ 1 500 [1, 10] in steps of 1 1.5 2 [100 , 1000] in steps of 100 2 1.5 3 500 2 [0.5, 3.0] in steps of 0.5 reduction actually took place). We begin with the analysis of the total error. For Experiment1, as shown in Figure A.4a, the total error committed by our algorithm for both input distributions is practically zero for every value ofk, whilst the total error of the algorithm by Agarwal et al. decreases whenk increases but still attains a very high value even fork = 10, 000. Regarding Experiment 2, depicted in Figure A.5a, again our algorithm is affected by a total error close to zero for both input distributions independently of the value ofn. On the contrary, the total error of the algorithm by Agarwal et al. steadily increases withn and is already very high even for the smallest value ofn. In Experiment 3, for both input distributions as shown in Figure A.6a, our algorithm is affected by total error close to zero. The algorithm by Agarwal et al. on the other hand, performs well only for skew values in the set{2.5, 3}, whilst the total error explodes for values in the set{1, 1.5, 2}, attaining its maximum value forρ = 1. To recap, our algorithm outperforms the other with regard to the total error in all of the experiments. Regarding the ARE, as shown in Figures A.4c, A.5c and A.6c, our algorithm clearly outperforms the other algo- rithm for both the input distributions, with ARE values practically equal to zero for the whole set ofk andn values under test in Experiments 1 and 2. For Experiment 3, our algorithm shows an ARE value slightly greater than zero only for ρ = 0.5; however, it’s worth noting here thatρ = 0.5 is the only case in which there are no frequent items. Finally, we analyze the precision attained. As shown in Figures A.4b and A.5b, in the Experiments 1 and 2 our algorithm clearly outperforms the algorithm by Agarwal et al. for both the input distributions. We obtain precision values equal to one for the whole set ofk andn values under test, whilst the Agarwal et al. algorithm’s precision is always less than 0.1. For Experiment 3, depicted in FigureA.6b, our algorithm provides excellent performances with precision equal to one for skew values in the set{1, 1.5, 2, 2.5, 3}. We note here that the precision is zero for both algorithms whenρ = 0.5, which is consistent with our previous observation (when discussing the ARE values) that in this case there are no frequent items. The precision obtained using the algorithm by Agarwal et al. reaches its maximum value (less than 0.2 nevertheless) forρ = 1, and then steadily decreases again. Therefore, in each of the different scenarios, the precision provided by our parallel algorithm is for all of the practical purposes identical to the precision attained by the sequential Space Saving algorithm, so that our main goal when designing the algorithm has been achieved. (a) Total Error varyingk (b) Precision varyingk (c) ARE varyingk Figure A.4: Experiment 1, Total Error, Precision and ARE varying k on p = 8 cores 18 (a) Total Error varyingn (b) Precision varyingn (c) ARE varyingn Figure A.5: Experiment 2, Total Error, Precision and ARE varying n on p = 8 cores (a) Total Error varyingρ (b) Precision varyingρ (c) ARE varyingρ Figure A.6: Experiment 3, Total Error, Precision and ARE varying ρ on p = 8 cores 19 Table A.3: Design of performance experiments for Zipfian andHurwitz distributions Experiment n k ρ 4 4,000,000,000 2,000 1.5 5 4,000,000,000 3,000 3.0 A.3. Synthetic datasets: performances We have designed and carried out some performance experiments characterized by the following parameters: the input sizen, k and the skewρ. For each input distribution generated, the algorithm has been run twenty times on up to 8 cores, and the results have been averaged for each numberof cores, over all of the runs. The input elements are 32 bits unsigned integers. Table A.3 reports the values actually used in each of the performance experiments. As shown in Table A.3, we have fixedn to 4 billions of input items and, in each experiment, we vary the values of k andρ. Figures A.7 and A.8, related to Experiments 4 and 5, respectively, show the performances for both the Zipfian and Hurwitz distributions with regard to running time, speedup and efficiency. Similar results were obtained with other settings. It is immediate to verify that the performances of our parallel Space Saving algorithm are comparable to the performances of the algorithm by Agarwal et al. for both Zipfian and Hurwitz distributions, with regard to overall running time, speedup and efficiency. In particular, the measured speedup shows in general a linear behavior, with corresponding efficiency close to 1 (or 100%). It is worth noting here that both algorithms exhibit, in some cases, a slightly superlinear speedup. This phenomenon, observed experimentally, is due to the cluster’s memory hierarchy and to related cache effects. So-called superlinear speedups, i.e., speedups which are greater than the number of processors/cores [2], are a source of confusion because in theory this phenomenon is not possible according to Brent’s principle [5] (which states that a single processor can simulate ap-processor algorithm with a uniform slowdown factor of p). Experimentally, a superlinear speedup can be observed without violating Brent’s principle when the storage space required to run the code on a particular instance exceeds thememory available on the single-processor machine, but not that of the parallel machine used for the simulation. In such a case, the sequential code needs to swap to secondary memory (disk) while the parallel code does not, therefore yielding a dramatic slowdown of the sequential code. On a more modest scale, the same problem could occur one level higher in the memory hierarchy, with the sequential code constantly cache-faulting while the parallel code cankeep all of the required data in its cache subsystems. A sequential algorithm usingM bytes of memory will use onlyM/p bytes on each processor of ap processor parallel system, so that it is easier to keep all of the data in cache memory on the parallel machine. This is exactly what happened in our simulations. We recall here that other possible sources of superlinear speedup include some brute–force search problems and the use of a suboptimal sequential algorithm. A parallel system might exhibit such behavior in search algorithms. In search problems performed by exhaustively looking for the solution, suppose the solution space is divided among the processors for each one to perform an independent search. In a sequential implementation the different search spaces are attacked one after the other, while in parallel they can be done simultaneously, and one processor may find the solution almost immediately, yelding a superlinear speedup. A parallel system might also exhibit such behavior when using a suboptimal sequential algorithm: each processing element spends less than the time required by the sequential algorithm divided byp solving the problem. Generally, if a purely deterministic parallel algorithm were to achieve better thanp times the speedup over the current sequential algorithm, the parallel algorithm (by Brent’s principle) could be emulated on a single processor one parallel part after another, to achieve a faster serial program, which contradicts the assumption of an optimal serial program. We recall here that in experiment 5 we used a skew value equal to 3.0, which corresponds to highly skewed distri- butions of no real practical interest. However, we did thesetests anyway for completeness, to test the performances of the algorithms also in this case. Even though both algorithms under test show comparable performances for all of the practical purposes, our algorithm outperforms the one by Agarwal et al. with regard to the error committed, as shown in Section A.2. 20 (a) Zipfian (b) Hurwitz (c) Zipfian (d) Hurwitz Figure A.7: Experiment 4: Running Time, Speedup and Efficiency 21 (a) Zipfian (b) Hurwitz (c) Zipfian (d) Hurwitz Figure A.8: Experiment 5: Running Time, Speedup and Efficiency 22 References [1] Pankaj K. Agarwal, Graham Cormode, Zengfeng Huang, Jeff Phillips, Zhewei Wei, and Ke Yi,Mergeable summaries, Proceedings of the 31st symposium on principles of database systems, 2012, pp. 23–34. [2] David A. Bader, Bernard M.E. Moret, and Peter Sanders,Algorithm engineering for parallel computation, Experimental algorithmics, 2002, pp. 1–23. [3] Kevin Beyer and Raghu Ramakrishnan,Bottom–up computation of sparse and iceberg cubes, Proceedings of the acm sigmod international conference on management of data. acm, new york, 1999, pp. 359–370. [4] R. Boyer and J. Moore,Mjrty – a fast majority vote algorithm, Technical Report 32, Institute for Computing Science, University of Texas, Austin, 1981. [5] Richard P. Brent,The Parallel Evaluation of General Arithmetic Expressions, J. ACM 21 (1974), no. 2, 201–206. [6] Sergey Brin, Rajeev Motwani, Jeffrey D. Ullman, and Shalom Tsur,Dynamic itemset counting and implication rules for market basket data, Sigmod ’97: Proceedings of the 1997 acm sigmod international conference on management of data, 1997, pp. 255–264. [7] Massimo Cafaro and Piergiulio Tempesta,Finding frequent items in parallel, Concurr. Comput. : Pract. Exper.23 (October 2011), no. 15, 1774–1788. [8] Moses Charikar, Kevin Chen, and Martin Farach-Colton,Finding frequent items in data streams, Icalp ’02: Proceedings of the 29th interna- tional colloquium on automata, languages and programming,2002, pp. 693–703. [9] Graham Cormode and Marios Hadjieleftheriou,Finding frequent items in data streams, Proc. VLDB Endow.1 (2008August), 1530–1541. [10] , Finding the frequent items in streams of data, Commun. ACM52 (2009), no. 10, 97–105. [11] Graham Cormode and S. Muthukrishnan,An improved data stream summary: the count-min sketch and its applications, J. Algorithms55 (2005), no. 1, 58–75. [12] Michele Dallachiesa and Themis Palpanas,Identifying streaming frequent items in ad hoc time windows, Data Knowl. Eng.87 (2013), 66–90. [13] Sudipto Das, Shyam Antony, Divyakant Agrawal, and Amr El Abbadi,Thread cooperation in multicore architectures for frequency counting over multiple data streams, Proc. VLDB Endow.2 (August 2009), no. 1, 217–228. [14] Erik D. Demaine, Alejandro López-Ortiz, and J. Ian Munro, Frequency estimation of internet packet streams with limited space, Esa, 2002, pp. 348–360. [15] Ugo Erra and Bernardino Frola,Frequent items mining acceleration exploiting fast parallel sorting on the{GPU}, Procedia Computer Science 9 (2012), no. 0, 86 –95. Proceedings of the International Conference on Computational Science,{ICCS} 2012. [16] Cristian Estan and George Varghese,New directions in traffic measurement and accounting, Imw ’01: Proceedings of the 1st acm sigcomm workshop on internet measurement, 2001, pp. 75–80. [17] Min Fang, Narayanan Shivakumar, Hector Garcia-Molina, Rajeev Motwani, and Jeffrey D. Ullman,Computing iceberg queries efficiently, Proceedings of the 24th international conference on very large data bases, vldb. morgan–kaufmann, san mateo, calif., 1998, pp. 299–310. [18] M. Fischer and S. Salzberg,Finding a majority among n votes: Solution to problem 81–5, J. of Algorithms3 (1982), 376–379. [19] Alexander Gelbukhl (ed.),Computational linguistics and intelligent text processing, 7th international conference, cicling 2006, Lecture Notes in Computer Science, vol. 3878, Springer–Verlag. [20] Phillip B. Gibbons and Yossi Matias,Synopsis data structures for massive data sets, Dimacs: Series in discrete mathematics and theoretical computer science: Special issue on external memory algorithms and visualization, vol. a., 1999, pp. 39–70. [21] Naga K. Govindaraju, Nikunj Raghuvanshi, and Dinesh Manocha,Fast and approximate stream mining of quantiles and frequencies using graphics processors, Proceedings of the 2005 acm sigmod international conference on management of data, 2005, pp. 611–622. [22] Ananth Grama, Anshul Gupta, and Vipin Kumar,Isoefficiency: Measuring the scalability of parallel algorithms and architectures, IEEE Parallel and Distributed Technology1 (August 1993), no. 3, 12–21. [23] Henryk Iwaniec and Emmanuel Kowalski,Analytic number theory, Amer. Math. Soc. Colloq. Publ., Amer. Math. Soc., Providence, RI, 2004. [24] Richard M. Karp, Scott Shenker, and Christos H. Papadimitriou, A simple algorithm for finding frequent elements in streams and bags, ACM Trans. Database Syst.28 (2003), no. 1, 51–55. [25] Yao Li, Zhi-Heng Zhang, Wen-Bin Chen, and Fan Min,Tdup: an approach to incremental mining of frequent itemsets with three-way-decision pattern updating, International Journal of Machine Learning and Cybernetics (2015), 1–13. [26] Gurmeet Singh Manku and Rajeev Motwani,Approximate frequency counts over data streams, In vldb, 2002, pp. 346–357. [27] Ahmed Metwally, Divyakant Agrawal, and Amr El Abbadi,An integrated efficient solution for computing frequent and top-k elements indata streams, ACM Trans. Database Syst.31 (September 2006), no. 3, 1095–1133. [28] Jayadev Misra and David Gries,Finding repeated elements, Sci. Comput. Program.2 (1982), no. 2, 143–152. [29] Marghny H. Mohamed and Mohammed M. Darwieesh,Efficient mining frequent itemsets algorithms, International Journal of Machine Learn- ing and Cybernetics5 (2014), no. 6, 823–833. [30] D. Mouillot and A. Lepretre,Introduction of relative abundance distribution (rad) indices, estimated from the rank-frequency diagrams (rfd), to assess changes in community diversity, Environmental Monitoring and Assessment63 (2000), no. 2, 279–295. 23 [31] Rong Pan, Lee Breslau, Balaji Prabhakar, and Scott Shenker, Approximate fairness through differential dropping, SIGCOMM Comput. Commun. Rev.33 (2003), no. 2, 23–39. [32] Michael J. Quinn,Parallel programming in C with MPI and openMP, McGraw–Hill, 2003. [33] Pratanu Roy, Jens Teubner, and Gustavo Alonso,Efficient frequent item counting in multi-core hardware, Proceedings of the 18th acm sigkdd international conference on knowledge discovery and data mining, 2012, pp. 1451–1459. [34] Apostolos Syropoulos,Mathematics of multisets, In multiset processing: Mathematical, computer science,and molecular computing points of view, lncs 2235, 2001, pp. 347–358. [35] Kanat Tangwongsan, Srikanta Tirthapura, and Kun-LungWu, Parallel streaming frequency-based aggregates, Proceedings of the 26th acm symposium on parallelism in algorithms and architectures,2014, pp. 236–245. [36] Piergiulio Tempesta,L-series and hurwitz zeta functions associated with the universal formal group, Annali Sc. Normale Superiore, Classe di ScienzeIX (2010), no. 1, 1–12. [37] Bay Vo, Tuong Le, Frans Coenen, and Tzung-Pei Hong,Mining frequent itemsets using the n-list and subsume concepts, International Journal of Machine Learning and Cybernetics (2014), 1–13. [38] Xian Wu, Wei Fan, Jing Peng, Kun Zhang, and Yong Yu,Iterative sampling based frequent itemset mining for big data, International Journal of Machine Learning and Cybernetics (2015), 1–8. [39] Yu Zhang,Parallelizing the weighted lossy counting algorithm in high-speed network monitoring, Instrumentation, measurement, computer, communication and control (imccc), second international conference on, 2012, pp. 757–761. [40] Yu Zhang, Yue Sun, Jianzhong Zhang, Jingdong Xu, and Ying Wu,An efficient framework for parallel and continuous frequent item monitor- ing, Concurrency and Computation: Practice and Experience26 (2014), no. 18, 2856–2879. 24 1 Introduction 2 The space saving algorithm 3 A parallel space saving algorithm 4 Correctness 5 Parallel complexity 6 Conclusions A Experimental results A.1 Real datasets: error A.2 Synthetic datasets: error A.3 Synthetic datasets: performances